This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1cc1e776f7 KAFKA-14095: Improve handling of sync offset failures in
MirrorMaker (#12432)
1cc1e776f7 is described below
commit 1cc1e776f703b180f4bd979e8a551805b3bdc94e
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Aug 1 12:59:41 2022 +0200
KAFKA-14095: Improve handling of sync offset failures in MirrorMaker
(#12432)
We should not treat UNKNOWN_MEMBER_ID as an unexpected error in the Admin
client. In MirrorMaker, check the result of committing offsets and log an
useful error message in case that failed with UNKNOWN_MEMBER_ID.
Reviewers: Chris Egerton <[email protected]>
---
.../internals/AlterConsumerGroupOffsetsHandler.java | 2 ++
.../kafka/connect/mirror/MirrorCheckpointTask.java | 17 ++++++++++++++---
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index eab2e2bb73..425ed66bd2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -179,6 +179,8 @@ public class AlterConsumerGroupOffsetsHandler extends
AdminApiHandler.Batched<Co
case INVALID_GROUP_ID:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
+ // Member level errors.
+ case UNKNOWN_MEMBER_ID:
log.debug("OffsetCommit request for group id {} failed due to
error {}.",
groupId.idValue, error);
partitionResults.put(topicPartition, error);
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 959961812e..3e6247334b 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -17,9 +17,11 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
@@ -306,9 +308,18 @@ public class MirrorCheckpointTask extends SourceTask {
void syncGroupOffset(String consumerGroupId, Map<TopicPartition,
OffsetAndMetadata> offsetToSync) {
if (targetAdminClient != null) {
- targetAdminClient.alterConsumerGroupOffsets(consumerGroupId,
offsetToSync);
- log.trace("sync-ed the offset for consumer group: {} with {}
number of offset entries",
- consumerGroupId, offsetToSync.size());
+ AlterConsumerGroupOffsetsResult result =
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
+ result.all().whenComplete((v, throwable) -> {
+ if (throwable != null) {
+ if (throwable.getCause() instanceof
UnknownMemberIdException) {
+ log.warn("Unable to sync offsets for consumer group
{}. This is likely caused by consumers currently using this group in the target
cluster.", consumerGroupId);
+ } else {
+ log.error("Unable to sync offsets for consumer group
{}.", consumerGroupId, throwable);
+ }
+ } else {
+ log.trace("Sync-ed {} offsets for consumer group {}.",
offsetToSync.size(), consumerGroupId);
+ }
+ });
}
}