C0urante commented on code in PR #12432:
URL: https://github.com/apache/kafka/pull/12432#discussion_r932857580
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -306,7 +308,16 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>>
syncGroupOffset() {
void syncGroupOffset(String consumerGroupId, Map<TopicPartition,
OffsetAndMetadata> offsetToSync) {
if (targetAdminClient != null) {
- targetAdminClient.alterConsumerGroupOffsets(consumerGroupId,
offsetToSync);
+ AlterConsumerGroupOffsetsResult result =
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
+ result.all().whenComplete((v, throwable) -> {
+ if (throwable != null) {
+ if (throwable.getCause() instanceof
UnknownMemberIdException) {
+ log.warn("Unable to sync offsets for consumer group
{}. This is likely caused by consumers currently using this group in the target
cluster.", consumerGroupId);
+ } else {
+ log.error("Unable to sync offsets for consumer group
{}.", consumerGroupId, throwable);
+ }
+ }
+ });
log.trace("sync-ed the offset for consumer group: {} with {}
number of offset entries",
Review Comment:
Should we update this log message since it's not guaranteed that the sync
will have completed by this point, or that it will even complete successfully
at all?
Or alternatively, could we keep the message as-is, but move it into an
`else` block in the callback we pass to `whenComplete`?
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java:
##########
@@ -179,6 +179,9 @@ private void handleError(
case INVALID_GROUP_ID:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
+ // Member level errors.
+ case UNKNOWN_MEMBER_ID:
+ case FENCED_INSTANCE_ID:
Review Comment:
When would we encounter this error? My understanding is that it'll only be
returned when the broker receives a request with a group instance ID defined,
and IIUC it's not possible to define one with the admin API we expose right now.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -306,7 +308,16 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>>
syncGroupOffset() {
void syncGroupOffset(String consumerGroupId, Map<TopicPartition,
OffsetAndMetadata> offsetToSync) {
if (targetAdminClient != null) {
- targetAdminClient.alterConsumerGroupOffsets(consumerGroupId,
offsetToSync);
+ AlterConsumerGroupOffsetsResult result =
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
+ result.all().whenComplete((v, throwable) -> {
+ if (throwable != null) {
+ if (throwable.getCause() instanceof
UnknownMemberIdException) {
Review Comment:
How stable is the API for the exceptions we'll see here? Can we be
reasonably certain that the exception we want to examine will always be wrapped?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]