This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 43f603cfb7e KAFKA-19351: AsyncConsumer#commitAsync should copy the
input offsets (#19855)
43f603cfb7e is described below
commit 43f603cfb7ef52d9bb84c2d3c580e8020bc4577a
Author: Lan Ding <[email protected]>
AuthorDate: Fri May 30 16:36:38 2025 +0800
KAFKA-19351: AsyncConsumer#commitAsync should copy the input offsets
(#19855)
`AsyncConsumer#commitAsync` and `AsyncConsumer#commitSync` should copy
the input offsets.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 12 ++++---
.../consumer/internals/AsyncKafkaConsumer.java | 8 ++---
.../consumer/internals/AsyncKafkaConsumerTest.java | 40 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3a06e71335d..0e7d3d65b1f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1003,7 +1003,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* Note that asynchronous offset commits sent previously with the {@link
#commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to
completion of this method.
*
- * @param offsets A map of offsets by partition with associated metadata
+ * @param offsets A map of offsets by partition with associated metadata.
This map will be copied internally, so it
+ * is safe to mutate the map after returning.
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the
commit failed and cannot be retried.
* This can only occur if you are using automatic group
management with {@link #subscribe(Collection)},
* or if there is an active group with the same
<code>group.id</code> which is using group management. In such cases,
@@ -1054,7 +1055,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* Note that asynchronous offset commits sent previously with the {@link
#commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to
completion of this method.
*
- * @param offsets A map of offsets by partition with associated metadata
+ * @param offsets A map of offsets by partition with associated metadata.
This map will be copied internally, so it
+ * is safe to mutate the map after returning.
* @param timeout The maximum amount of time to await completion of the
offset commit
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the
commit failed and cannot be retried.
* This can only occur if you are using automatic group
management with {@link #subscribe(Collection)},
@@ -1143,7 +1145,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* offsets committed through this API are guaranteed to complete before a
subsequent call to {@link #commitSync()}
* (and variants) returns.
*
- * @param offsets A map of offsets by partition with associate metadata.
This map will be copied internally, so it
+ * @param offsets A map of offsets by partition with associated metadata.
This map will be copied internally, so it
* is safe to mutate the map after returning.
* @param callback Callback to invoke when the commit completes
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
@@ -1563,7 +1565,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* @param timestampsToSearch the mapping from partition to the timestamp
to look up.
*
* @return a mapping from partition to the timestamp and offset of the
first message with timestamp greater
- * than or equal to the target timestamp. If the timestamp and
offset for a specific partition cannot be found within
+ * than or equal to the target timestamp. If the timestamp and
offset for a specific partition cannot be found within
* the default timeout, and no corresponding message exists, the
entry in the returned map will be {@code null}
* @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
@@ -1590,7 +1592,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* @param timeout The maximum amount of time to await retrieval of the
offsets
*
* @return a mapping from partition to the timestamp and offset of the
first message with timestamp greater
- * than or equal to the target timestamp. If the timestamp and
offset for a specific partition cannot be found within
+ * than or equal to the target timestamp. If the timestamp and
offset for a specific partition cannot be found within
* timeout, and no corresponding message exists, the entry in the
returned map will be {@code null}
* @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e301b6855c6..29843c765c3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -385,7 +385,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
setGroupAssignmentSnapshot(partitions);
}
};
-
+
public AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
@@ -927,7 +927,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
- commitAsync(Optional.of(offsets), callback);
+ commitAsync(Optional.of(new HashMap<>(offsets)), callback);
}
private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>>
offsets, OffsetCommitCallback callback) {
@@ -1599,12 +1599,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
- commitSync(Optional.of(offsets), defaultApiTimeoutMs);
+ commitSync(Optional.of(new HashMap<>(offsets)), defaultApiTimeoutMs);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets,
Duration timeout) {
- commitSync(Optional.of(offsets), timeout);
+ commitSync(Optional.of(new HashMap<>(offsets)), timeout);
}
private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>>
offsets, Duration timeout) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index bb9482aa636..ec5b6b1f6f8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -357,6 +357,26 @@ public class AsyncKafkaConsumerTest {
assertSame(exception.getClass(), callback.exception.getClass());
}
+ @Test
+ public void testCommitAsyncShouldCopyOffsets() {
+ consumer = newConsumer();
+
+ TopicPartition tp = new TopicPartition("t0", 2);
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp, new OffsetAndMetadata(10L));
+
+ markOffsetsReadyForCommitEvent();
+ consumer.commitAsync(offsets, null);
+
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ assertTrue(commitEvent.offsets().isPresent());
+ assertTrue(commitEvent.offsets().get().containsKey(tp));
+ offsets.remove(tp);
+ assertTrue(commitEvent.offsets().get().containsKey(tp));
+ }
+
private static Stream<Exception> commitExceptionSupplier() {
return Stream.of(
new KafkaException("Test exception"),
@@ -590,6 +610,26 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() ->
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)),
Duration.ofMillis(100)));
}
+ @Test
+ public void testCommitSyncShouldCopyOffsets() {
+ consumer = newConsumer();
+
+ TopicPartition tp = new TopicPartition("t0", 2);
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp, new OffsetAndMetadata(10L));
+
+ completeCommitSyncApplicationEventSuccessfully();
+ consumer.commitSync(offsets);
+
+ final ArgumentCaptor<SyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(SyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final SyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ assertTrue(commitEvent.offsets().isPresent());
+ assertTrue(commitEvent.offsets().get().containsKey(tp));
+ offsets.remove(tp);
+ assertTrue(commitEvent.offsets().get().containsKey(tp));
+ }
+
private CompletableFuture<Void>
setUpConsumerWithIncompleteAsyncCommit(TopicPartition tp) {
time = new MockTime(1);
consumer = newConsumer();