This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b909544e996 MINOR: Improve consistency of acknowledge type terminology (#20282) b909544e996 is described below commit b909544e996cdde0972099fb57af65cd4e83dfdc Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Fri Aug 1 21:17:22 2025 +0100 MINOR: Improve consistency of acknowledge type terminology (#20282) The code had a mixture of "acknowledgement type" and "acknowledge type". The latter is preferred. Reviewers: TengYao Chi <frankvi...@apache.org>, Lan Ding <isdin...@163.com> --- .../apache/kafka/clients/consumer/ShareConsumerTest.java | 10 +++++----- .../kafka/clients/consumer/KafkaShareConsumer.java | 2 +- .../clients/consumer/internals/Acknowledgements.java | 6 +++--- .../clients/consumer/internals/ShareCompletedFetch.java | 2 +- .../kafka/clients/consumer/internals/ShareFetch.java | 4 ++-- .../clients/consumer/internals/AcknowledgementsTest.java | 6 +++--- .../test/java/kafka/server/share/SharePartitionTest.java | 16 ++++++++-------- .../src/test/scala/unit/kafka/server/KafkaApisTest.scala | 4 ++-- .../share/acknowledge/ShareAcknowledgementBatch.java | 2 +- .../apache/kafka/server/share/fetch/InFlightBatch.java | 2 +- 10 files changed, 27 insertions(+), 27 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 957d72cea01..2335c223b07 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -362,7 +362,7 @@ public class ShareConsumerTest { return partitionOffsetsMap.containsKey(tp); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to callback"); - // We expect no exception as the acknowledgment error code is null. + // We expect no exception as the acknowledgement error code is null. assertFalse(partitionExceptionMap.containsKey(tp)); verifyShareGroupStateTopicRecordsProduced(); } @@ -391,7 +391,7 @@ public class ShareConsumerTest { shareConsumer.poll(Duration.ofMillis(1000)); shareConsumer.close(); - // We expect no exception as the acknowledgment error code is null. + // We expect no exception as the acknowledgement error code is null. assertFalse(partitionExceptionMap.containsKey(tp)); verifyShareGroupStateTopicRecordsProduced(); } @@ -1500,7 +1500,7 @@ public class ShareConsumerTest { shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer)); shareConsumer.subscribe(Set.of(tp.topic())); - // The acknowledgment commit callback will try to call a method of ShareConsumer + // The acknowledgement commit callback will try to call a method of ShareConsumer shareConsumer.poll(Duration.ofMillis(5000)); // The second poll sends the acknowledgements implicitly. // The acknowledgement commit callback will be called and the exception is thrown. @@ -1540,14 +1540,14 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - // The acknowledgment commit callback will try to call a method of ShareConsumer + // The acknowledgement commit callback will try to call a method of ShareConsumer shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer)); shareConsumer.subscribe(Set.of(tp.topic())); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); - // The second poll sends the acknowledgments implicitly. + // The second poll sends the acknowledgements implicitly. shareConsumer.poll(Duration.ofMillis(2000)); // Till now acknowledgement commit callback has not been called, so no exception thrown yet. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index 743c77b4228..655788e8469 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -195,7 +195,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; * </pre> * * <h4>Per-record acknowledgement (explicit acknowledgement)</h4> - * This example demonstrates using different acknowledgement types depending on the outcome of processing the records. + * This example demonstrates using different acknowledge types depending on the outcome of processing the records. * Here the {@code share.acknowledgement.mode} property is set to "explicit" so the consumer must explicitly acknowledge each record. * <pre> * Properties props = new Properties(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java index 8d3fab23587..5bce77651b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java @@ -185,7 +185,7 @@ public class Acknowledgements { currentBatch.acknowledgeTypes().add(ACKNOWLEDGE_TYPE_GAP); } } - List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgementTypes(currentBatch); + List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgeTypes(currentBatch); optimalBatches.forEach(batch -> { if (canOptimiseForSingleAcknowledgeType(batch)) { @@ -204,7 +204,7 @@ public class Acknowledgements { */ private AcknowledgementBatch maybeCreateNewBatch(AcknowledgementBatch currentBatch, Long nextOffset, List<AcknowledgementBatch> batches) { if (nextOffset != currentBatch.lastOffset() + 1) { - List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgementTypes(currentBatch); + List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgeTypes(currentBatch); optimalBatches.forEach(batch -> { if (canOptimiseForSingleAcknowledgeType(batch)) { @@ -228,7 +228,7 @@ public class Acknowledgements { * whose count exceeds the default value. In this case, the batch is split into 2 such that the * batch with the continuous records has only 1 acknowledge type in its array. */ - private List<AcknowledgementBatch> maybeOptimiseAcknowledgementTypes(AcknowledgementBatch currentAcknowledgeBatch) { + private List<AcknowledgementBatch> maybeOptimiseAcknowledgeTypes(AcknowledgementBatch currentAcknowledgeBatch) { List<AcknowledgementBatch> batches = new ArrayList<>(); if (currentAcknowledgeBatch == null) return batches; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index f2664050bc8..2c337782dd4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -154,7 +154,7 @@ public class ShareCompletedFetch { * @param maxRecords The number of records to return; the number returned may be {@code 0 <= maxRecords} * @param checkCrcs Whether to check the CRC of fetched records * - * @return {@link ShareInFlightBatch The ShareInFlightBatch containing records and their acknowledgments} + * @return {@link ShareInFlightBatch The ShareInFlightBatch containing records and their acknowledgements} */ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deserializers, final int maxRecords, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java index d587e29f382..406110fe502 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java @@ -110,7 +110,7 @@ public class ShareFetch<K, V> { * Acknowledge a single record in the current batch. * * @param record The record to acknowledge - * @param type The acknowledgment type which indicates whether it was processed successfully + * @param type The acknowledge type which indicates whether it was processed successfully */ public void acknowledge(final ConsumerRecord<K, V> record, final AcknowledgeType type) { for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) { @@ -129,7 +129,7 @@ public class ShareFetch<K, V> { * @param topic The topic of the record to acknowledge * @param partition The partition of the record * @param offset The offset of the record - * @param type The acknowledgment type which indicates whether it was processed successfully + * @param type The acknowledge type which indicates whether it was processed successfully */ public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) { for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java index 779df4fb43c..b6818ab51b5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java @@ -83,7 +83,7 @@ public class AcknowledgementsTest { } @Test - public void testSingleAcknowledgementTypeExceedingLimit() { + public void testSingleAcknowledgeTypeExceedingLimit() { int i = 0; for (; i < maxRecordsWithSameAcknowledgeType; i++) { acks.add(i, AcknowledgeType.ACCEPT); @@ -119,7 +119,7 @@ public class AcknowledgementsTest { } @Test - public void testSingleAcknowledgementTypeWithGap() { + public void testSingleAcknowledgeTypeWithGap() { for (int i = 0; i < maxRecordsWithSameAcknowledgeType; i++) { acks.add(i, null); } @@ -186,7 +186,7 @@ public class AcknowledgementsTest { } @Test - public void testSingleAcknowledgementTypeWithinLimit() { + public void testSingleAcknowledgeTypeWithinLimit() { acks.add(0L, AcknowledgeType.ACCEPT); acks.add(1L, AcknowledgeType.ACCEPT); acks.add(2L, AcknowledgeType.ACCEPT); diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c4fc0a0454d..c17ce391b37 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -4462,7 +4462,7 @@ public class SharePartitionTest { assertEquals(20, sharePartition.startOffset()); assertEquals(36, sharePartition.endOffset()); - // For cached state corresponding to entry 2, the batch state will be ACKNOWLEDGED, hence it will be cleared as part of acknowledgment. + // For cached state corresponding to entry 2, the batch state will be ACKNOWLEDGED, hence it will be cleared as part of acknowledgement. assertEquals(6, sharePartition.cachedState().size()); assertEquals(MEMBER_ID, sharePartition.cachedState().get(7L).batchMemberId()); @@ -4768,7 +4768,7 @@ public class SharePartitionTest { } @Test - public void testLsoMovementAheadOfEndOffsetPostAcknowledgment() { + public void testLsoMovementAheadOfEndOffsetPostAcknowledgement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); @@ -4884,7 +4884,7 @@ public class SharePartitionTest { } @Test - public void testLsoMovementPostGapsInAcknowledgments() { + public void testLsoMovementPostGapsInAcknowledgements() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(2, 5); @@ -5733,7 +5733,7 @@ public class SharePartitionTest { } @Test - public void testMaybeUpdateCachedStateWhenAcknowledgementTypeAccept() { + public void testMaybeUpdateCachedStateWhenAcknowledgeTypeAccept() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250); @@ -5753,7 +5753,7 @@ public class SharePartitionTest { } @Test - public void testMaybeUpdateCachedStateWhenAcknowledgementTypeReject() { + public void testMaybeUpdateCachedStateWhenAcknowledgeTypeReject() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250); @@ -5773,7 +5773,7 @@ public class SharePartitionTest { } @Test - public void testMaybeUpdateCachedStateWhenAcknowledgementTypeRelease() { + public void testMaybeUpdateCachedStateWhenAcknowledgeTypeRelease() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250); @@ -5937,7 +5937,7 @@ public class SharePartitionTest { fetchAcquiredRecords(sharePartition, memoryRecords(100, 80), 100); assertFalse(sharePartition.canAcquireRecords()); - // Final Acknowledgment, all records are acknowledged here. + // Final Acknowledgement, all records are acknowledged here. sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(50, 179, List.of((byte) 3)))); @@ -5984,7 +5984,7 @@ public class SharePartitionTest { fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); assertTrue(sharePartition.canAcquireRecords()); - // Sending acknowledgment for the first batch from 11 to 20 + // Sending acknowledgement for the first batch from 11 to 20 sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(11, 20, List.of((byte) 1)))); diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index d7b35de63e5..5676e454887 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -7979,12 +7979,12 @@ class KafkaApisTest extends Logging { private def compareAcknowledgementBatches(baseOffset: Long, endOffset: Long, - acknowledgementType: Byte, + acknowledgeType: Byte, acknowledgementBatch: ShareAcknowledgementBatch ): Boolean = { if (baseOffset == acknowledgementBatch.firstOffset() && endOffset == acknowledgementBatch.lastOffset() - && acknowledgementType == acknowledgementBatch.acknowledgeTypes().get(0)) { + && acknowledgeType == acknowledgementBatch.acknowledgeTypes().get(0)) { return true } false diff --git a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java index 50aabb3903e..bc29f37c62f 100644 --- a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java +++ b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java @@ -21,7 +21,7 @@ import java.util.List; /** * The ShareAcknowledgementBatch containing the fields required to acknowledge the fetched records. - * The class abstracts the acknowledgment request for <code>SharePartition</code> class constructed + * The class abstracts the acknowledgement request for <code>SharePartition</code> class constructed * from {@link org.apache.kafka.common.message.ShareFetchRequestData.AcknowledgementBatch} and * {@link org.apache.kafka.common.message.ShareAcknowledgeRequestData.AcknowledgementBatch} classes. */ diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java index c6a16c5056e..c3e2d353328 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java @@ -48,7 +48,7 @@ public class InFlightBatch { // The offset state map is used to track the state of the records per offset. However, the // offset state map is only required when the state of the offsets within same batch are - // different. The states can be different when explicit offset acknowledgment is done which + // different. The states can be different when explicit offset acknowledgement is done which // is different from the batch state. private NavigableMap<Long, InFlightState> offsetState;