C0urante commented on code in PR #12432:
URL: https://github.com/apache/kafka/pull/12432#discussion_r933464520


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -306,9 +308,18 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> 
syncGroupOffset() {
 
     void syncGroupOffset(String consumerGroupId, Map<TopicPartition, 
OffsetAndMetadata> offsetToSync) {
         if (targetAdminClient != null) {
-            targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, 
offsetToSync);
-            log.trace("sync-ed the offset for consumer group: {} with {} 
number of offset entries",
-                      consumerGroupId, offsetToSync.size());
+            AlterConsumerGroupOffsetsResult result = 
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);

Review Comment:
   Nit: would it also be useful to have a log line here indicating that we're 
attempting to sync offsets?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -306,7 +308,16 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> 
syncGroupOffset() {
 
     void syncGroupOffset(String consumerGroupId, Map<TopicPartition, 
OffsetAndMetadata> offsetToSync) {
         if (targetAdminClient != null) {
-            targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, 
offsetToSync);
+            AlterConsumerGroupOffsetsResult result = 
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
+            result.all().whenComplete((v, throwable) -> {
+                if (throwable != null) {
+                    if (throwable.getCause() instanceof 
UnknownMemberIdException) {

Review Comment:
   I took a closer look at the [`KafkaFuture` 
Javadocs](https://kafka.apache.org/32/javadoc/org/apache/kafka/common/KafkaFuture.html).
   
   The docs for `KafkaFuture::whenComplete` state (emphasis mine):
   
   > Returns a new KafkaFuture **with the same result or exception as this 
future**, that executes the given action when this future completes. When this 
future is done, **the given action is invoked with the result (or null if none) 
and the exception (or null if none) of this future as arguments**."
   
   Given this, we know that `whenComplete` receives the same exception that 
would be thrown by, e.g., `KafkaFuture::get`, if the action failed.
   
   The method signatures for `KafkaFuture` declare that they throw 
`ExecutionException` if the action fails, which always wraps the cause of 
failure.
   
   So, I'm convinced that we can expect this wrapping 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to