philipnee commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1169362391


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -479,18 +530,42 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
+    /**
+     * Commit the user provided offsets, blocking until the commit completes 
or the timeout expires. If the future is
+     * interrupted by wakeup, the future will be completed with an
+     * {@link org.apache.kafka.common.errors.WakeupException}.
+     *
+     * @param offsets offsets to commit.
+     * @param timeout amount of time to block for the commit to complete.
+     *
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
+     *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
specified by {@code default.api.timeout.ms} expires
+     *            before successful completion of the offset commit
+     */
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
Duration timeout) {
-        CompletableFuture<Void> commitFuture = commit(offsets);
+        maybeWakeup();
+        final WakeupableFuture<Void> commitFuture = commit(offsets);
+        activeFutures.add(commitFuture);
         try {
             commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-        } catch (final TimeoutException e) {
-            throw new org.apache.kafka.common.errors.TimeoutException(e);
-        } catch (final InterruptedException e) {
-            throw new InterruptException(e);
-        } catch (final ExecutionException e) {
+        } catch (ExecutionException e) {
             throw new KafkaException(e);
-        } catch (final Exception e) {
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        } catch (TimeoutException e) {
+            throw new org.apache.kafka.common.errors.TimeoutException(e);
+        } catch (WakeupException e) {
+            this.shouldWakeup.set(false);
+            this.activeFutures.remove(commitFuture);

Review Comment:
   this probably should be in the finally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to