philipnee commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1169362391
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -479,18 +530,42 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) { commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs)); } + /** + * Commit the user provided offsets, blocking until the commit completes or the timeout expires. If the future is + * interrupted by wakeup, the future will be completed with an + * {@link org.apache.kafka.common.errors.WakeupException}. + * + * @param offsets offsets to commit. + * @param timeout amount of time to block for the commit to complete. + * + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata + * is too large or if the topic does not exist). + * @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires + * before successful completion of the offset commit + */ @Override public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) { - CompletableFuture<Void> commitFuture = commit(offsets); + maybeWakeup(); + final WakeupableFuture<Void> commitFuture = commit(offsets); + activeFutures.add(commitFuture); try { commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (final TimeoutException e) { - throw new org.apache.kafka.common.errors.TimeoutException(e); - } catch (final InterruptedException e) { - throw new InterruptException(e); - } catch (final ExecutionException e) { + } catch (ExecutionException e) { throw new KafkaException(e); - } catch (final Exception e) { + } catch (InterruptedException e) { + throw new InterruptException(e); + } catch (TimeoutException e) { + throw new org.apache.kafka.common.errors.TimeoutException(e); + } catch (WakeupException e) { + this.shouldWakeup.set(false); + this.activeFutures.remove(commitFuture); Review Comment: this probably should be in the finally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org