This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f737ef31d9b KAFKA-18900: Implement share.acknowledgement.mode to choose acknowledgement mode (#19417) f737ef31d9b is described below commit f737ef31d9bd5e8bbce11ffd7b9afd2e424cce3d Author: Shivsundar R <s...@confluent.io> AuthorDate: Tue Apr 15 11:38:33 2025 -0400 KAFKA-18900: Implement share.acknowledgement.mode to choose acknowledgement mode (#19417) Choose the acknowledgement mode based on the config (`share.acknowledgement.mode`) and not on the basis of how the user designs the application. - The default value of the config is `IMPLICIT`, so if any empty/null/invalid value is configured, then the mode defaults to `IMPLICIT`. - Removed AcknowledgementModes `UNKNOWN` and `PENDING` as they are no longer required. - Added code to ensure if the application has any unacknowledged records in a batch in "`explicit`" mode, then it will throw an `IllegalStateException`. The expectation is if the mode is "explicit", all the records received in that `poll()` would be acknowledged before the next call to `poll()`. - Modified the `ConsoleShareConsumer` to configure the mode to "explicit" as it was using the explicit mode of acknowledging records. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../kafka/clients/consumer/ShareConsumerTest.java | 116 +++++++++------------ .../kafka/clients/consumer/ConsumerConfig.java | 22 ++-- .../kafka/clients/consumer/KafkaShareConsumer.java | 84 +++++---------- .../internals/ShareAcknowledgementMode.java | 116 +++++++++++++++++++++ .../consumer/internals/ShareConsumerImpl.java | 78 ++++---------- .../internals/ShareAcknowledgementModeTest.java | 67 ++++++++++++ .../consumer/internals/ShareConsumerImplTest.java | 89 +++++++++++++++- .../kafka/tools/consumer/ConsoleShareConsumer.java | 8 +- 8 files changed, 376 insertions(+), 204 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 8d9e4dbd610..e402a4344c1 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -135,6 +135,8 @@ public class ShareConsumerTest { private List<TopicPartition> sgsTopicPartitions; private static final String KEY = "content-type"; private static final String VALUE = "application/octet-stream"; + private static final String EXPLICIT = "explicit"; + private static final String IMPLICIT = "implicit"; public ShareConsumerTest(ClusterInstance cluster) { this.cluster = cluster; @@ -594,7 +596,7 @@ public class ShareConsumerTest { public void testExplicitAcknowledgeSuccess() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -615,7 +617,7 @@ public class ShareConsumerTest { public void testExplicitAcknowledgeCommitSuccess() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -638,7 +640,7 @@ public class ShareConsumerTest { public void testExplicitAcknowledgementCommitAsync() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1"); + ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)); ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer("group1")) { ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); @@ -663,15 +665,16 @@ public class ShareConsumerTest { // Acknowledging 2 out of the 3 records received via commitAsync. ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next(); assertEquals(0L, firstRecord.offset()); assertEquals(1L, secondRecord.offset()); shareConsumer1.acknowledge(firstRecord); shareConsumer1.acknowledge(secondRecord); + shareConsumer1.acknowledge(thirdRecord, AcknowledgeType.RELEASE); shareConsumer1.commitAsync(); - // The 3rd record should be reassigned to 2nd consumer when it polls, kept higher wait time - // as time out for locks is 15 secs. + // The 3rd record should be reassigned to 2nd consumer when it polls. TestUtils.waitForCondition(() -> { ConsumerRecords<byte[], byte[]> records2 = shareConsumer2.poll(Duration.ofMillis(1000)); return records2.count() == 1 && records2.iterator().next().offset() == 2L; @@ -690,51 +693,16 @@ public class ShareConsumerTest { } } - @ClusterTest - public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend() throws InterruptedException { - alterShareAutoOffsetReset("group1", "earliest"); - try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - - shareConsumer.subscribe(Set.of(tp.topic())); - - Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>(); - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, Map.of())); - - // The acknowledgement mode moves to PENDING from UNKNOWN. - ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(0, records.count()); - shareConsumer.commitAsync(); - - ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - producer.send(record1); - producer.flush(); - - // The acknowledgement mode remains in PENDING because no records were returned. - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - - // The acknowledgement mode now moves to EXPLICIT. - shareConsumer.acknowledge(records.iterator().next()); - shareConsumer.commitAsync(); - - TestUtils.waitForCondition(() -> { - shareConsumer.poll(Duration.ofMillis(500)); - return partitionOffsetsMap1.containsKey(tp); - }, 30000, 100L, () -> "Didn't receive call to callback"); - verifyShareGroupStateTopicRecordsProduced(); - } - } - @ClusterTest public void testExplicitAcknowledgementCommitAsyncPartialBatch() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record4 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record1); producer.send(record2); producer.send(record3); @@ -753,6 +721,7 @@ public class ShareConsumerTest { // Acknowledging 2 out of the 3 records received via commitAsync. ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next(); assertEquals(0L, firstRecord.offset()); assertEquals(1L, secondRecord.offset()); @@ -760,21 +729,25 @@ public class ShareConsumerTest { shareConsumer1.acknowledge(secondRecord); shareConsumer1.commitAsync(); - // The 3rd record should be re-presented to the consumer when it polls again. - records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - iterator = records.iterator(); - firstRecord = iterator.next(); - assertEquals(2L, firstRecord.offset()); + producer.send(record4); + producer.flush(); + + // The next poll() should throw an IllegalStateException as there is still 1 unacknowledged record. + // In EXPLICIT acknowledgement mode, we are not allowed to have unacknowledged records from a batch. + assertThrows(IllegalStateException.class, () -> shareConsumer1.poll(Duration.ofMillis(5000))); + + // Acknowledging the 3rd record + shareConsumer1.acknowledge(thirdRecord); + shareConsumer1.commitAsync(); - // And poll again without acknowledging - the callback will receive the acknowledgement responses too + // The next poll() will not throw an exception, it would continue to fetch more records. records = shareConsumer1.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); iterator = records.iterator(); - firstRecord = iterator.next(); - assertEquals(2L, firstRecord.offset()); + ConsumerRecord<byte[], byte[]> fourthRecord = iterator.next(); + assertEquals(3L, fourthRecord.offset()); - shareConsumer1.acknowledge(firstRecord); + shareConsumer1.acknowledge(fourthRecord); // The callback will receive the acknowledgement responses after polling. The callback is // called on entry to the poll method or during close. The commit is being performed asynchronously, so @@ -784,6 +757,7 @@ public class ShareConsumerTest { shareConsumer1.close(); assertFalse(partitionExceptionMap.containsKey(tp)); + assertTrue(partitionOffsetsMap.containsKey(tp) && partitionOffsetsMap.get(tp).size() == 4); verifyShareGroupStateTopicRecordsProduced(); } } @@ -792,7 +766,7 @@ public class ShareConsumerTest { public void testExplicitAcknowledgeReleasePollAccept() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -815,7 +789,7 @@ public class ShareConsumerTest { public void testExplicitAcknowledgeReleaseAccept() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -835,7 +809,7 @@ public class ShareConsumerTest { public void testExplicitAcknowledgeReleaseClose() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -853,7 +827,7 @@ public class ShareConsumerTest { public void testExplicitAcknowledgeThrowsNotInBatch() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -970,7 +944,7 @@ public class ShareConsumerTest { try (Producer<byte[], byte[]> producer = createProducer(); ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( "group1", - Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) { + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -995,7 +969,7 @@ public class ShareConsumerTest { try (Producer<byte[], byte[]> producer = createProducer(); ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( "group1", - Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "implicit"))) { + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, IMPLICIT))) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); @@ -2069,7 +2043,7 @@ public class ShareConsumerTest { cluster.bootstrapServers(), topicName, groupId, - Map.of() + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT) ); service.schedule( @@ -2145,7 +2119,8 @@ public class ShareConsumerTest { alterShareAutoOffsetReset("group1", "earliest"); alterShareIsolationLevel("group1", "read_uncommitted"); try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1"); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of( + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { shareConsumer.subscribe(Set.of(tp.topic())); transactionalProducer.initTransactions(); try { @@ -2231,7 +2206,8 @@ public class ShareConsumerTest { alterShareAutoOffsetReset("group1", "earliest"); alterShareIsolationLevel("group1", "read_committed"); try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1"); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of( + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { shareConsumer.subscribe(Set.of(tp.topic())); transactionalProducer.initTransactions(); @@ -2282,7 +2258,11 @@ public class ShareConsumerTest { // Wait for the aborted marker offset for Message 4 (7L) to be fetched and acknowledged by the consumer. TestUtils.waitForCondition(() -> { - shareConsumer.poll(Duration.ofMillis(500)); + ConsumerRecords<byte[], byte[]> pollRecords = shareConsumer.poll(Duration.ofMillis(500)); + if (pollRecords.count() > 0) { + // We will release Message 3 again if it was received in this poll(). + pollRecords.forEach(consumerRecord -> shareConsumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE)); + } return partitionOffsetsMap2.containsKey(tp) && partitionOffsetsMap2.get(tp).contains(7L); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort transaction marker offset for Message 4"); @@ -2299,7 +2279,7 @@ public class ShareConsumerTest { produceCommittedTransaction(transactionalProducer, "Message 8"); // Since isolation level is READ_UNCOMMITTED, we can consume Message 3 (committed transaction that was released), Message 5, Message 6, Message 7 and Message 8. - List<String> finalMessages = new ArrayList<>(); + Set<String> finalMessages = new HashSet<>(); TestUtils.waitForCondition(() -> { ConsumerRecords<byte[], byte[]> pollRecords = shareConsumer.poll(Duration.ofMillis(5000)); if (pollRecords.count() > 0) { @@ -2311,11 +2291,8 @@ public class ShareConsumerTest { return finalMessages.size() == 5; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all records post altering share isolation level"); - assertEquals("Message 3", finalMessages.get(0)); - assertEquals("Message 5", finalMessages.get(1)); - assertEquals("Message 6", finalMessages.get(2)); - assertEquals("Message 7", finalMessages.get(3)); - assertEquals("Message 8", finalMessages.get(4)); + Set<String> expected = Set.of("Message 3", "Message 5", "Message 6", "Message 7", "Message 8"); + assertEquals(expected, finalMessages); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { @@ -2330,7 +2307,8 @@ public class ShareConsumerTest { alterShareAutoOffsetReset("group1", "earliest"); alterShareIsolationLevel("group1", "read_committed"); try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1"); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of( + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { shareConsumer.subscribe(Set.of(tp.topic())); transactionalProducer.initTransactions(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index e5917d9d593..68fe89a4147 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -381,13 +382,10 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; /** - * <code>share.acknowledgement.mode</code> is being evaluated as a new configuration to control the acknowledgement mode - * for share consumers. It will be removed or converted to a proper configuration before release. - * An alternative being considered is <code>enable.explicit.share.acknowledgement</code> as a boolean configuration. + * <code>share.acknowledgement.mode</code> */ - public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "internal.share.acknowledgement.mode"; - private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." + - " If unset, the acknowledgement mode of the consumer is decided by the method calls it uses to fetch and commit." + + public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "share.acknowledgement.mode"; + private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." + " If set to <code>implicit</code>, the acknowledgement mode of the consumer is implicit and it must not" + " use <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records. Instead," + " delivery is acknowledged implicitly on the next call to poll or commit." + @@ -401,7 +399,7 @@ public class ConsumerConfig extends AbstractConfig { */ private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = List.of( GROUP_REMOTE_ASSIGNOR_CONFIG, - INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + SHARE_ACKNOWLEDGEMENT_MODE_CONFIG ); /** @@ -411,7 +409,7 @@ public class ConsumerConfig extends AbstractConfig { PARTITION_ASSIGNMENT_STRATEGY_CONFIG, HEARTBEAT_INTERVAL_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG, - INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + SHARE_ACKNOWLEDGEMENT_MODE_CONFIG ); static { @@ -695,12 +693,12 @@ public class ConsumerConfig extends AbstractConfig { atLeast(0), Importance.LOW, CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC) - .define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, + .define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, Type.STRING, - null, - in(null, "implicit", "explicit"), + ShareAcknowledgementMode.IMPLICIT.name(), + new ShareAcknowledgementMode.Validator(), Importance.MEDIUM, - ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC); + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index d694984db57..b5a862f239d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -116,33 +116,33 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. * <p> - * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. - * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>explicit acknowledgement</em>. In this case: - * <ul> - * <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which commits the acknowledgements to Kafka. - * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application - * in response to a future poll.</li> - * <li>The application calls {@link #poll(Duration)} without committing first, which commits the acknowledgements to - * Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. - * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application - * in response to a future poll.</li> - * <li>The application calls {@link #close()} which attempts to commit any pending acknowledgements and - * releases any remaining acquired records.</li> - * </ul> - * If the application does not call {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>implicit acknowledgement</em>. In this case: + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the + * consumer {@code share.acknowledgement.mode} property. + * <p> + * If the application sets the property to "implicit" or does not set it at all, then the consumer is using + * <em>implicit acknowledgement</em>. In this mode, the application acknowledges delivery by: * <ul> - * <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all of - * the delivered records as processed successfully and commits the acknowledgements to Kafka.</li> - * <li>The application calls {@link #poll(Duration)} without committing, which also implicitly acknowledges all of + * <li>Calling {@link #poll(Duration)} without committing, which also implicitly acknowledges all * the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is * thrown by a failure to commit the acknowledgements.</li> - * <li>The application calls {@link #close()} which releases any acquired records without acknowledgement.</li> + * <li>Calling {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all + * the delivered records as processed successfully and commits the acknowledgements to Kafka.</li> + * <li>Calling {@link #close()} which releases any acquired records without acknowledgement.</li> + * </ul> + * If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgment</em>. + * The application must acknowledge all records returned from {@link #poll(Duration)} using + * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call to {@link #poll(Duration)}. + * If the application calls {@link #poll(Duration)} without having acknowledged all records, an + * {@link IllegalStateException} is thrown. The remaining unacknowledged records can still be acknowledged. + * In this mode, the application acknowledges delivery by: + * <ul> + * <li>Calling {@link #poll(Duration)} after it has acknowledged all records, which commits the acknowledgements + * to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.</li> + * <li>Calling {@link #commitSync()} or {@link #commitAsync()} which commits any pending + * acknowledgements to Kafka.</li> + * <li>Calling {@link #close()} which attempts to commit any pending acknowledgements and releases + * any remaining acquired records.</li> * </ul> - * <p>The consumer can optionally use the {@code internal.share.acknowledgement.mode} configuration property to choose - * between implicit and explicit acknowledgement, specifying <code>"implicit"</code> or <code>"explicit"</code> as required. - * <p> * The consumer guarantees that the records returned in the {@code ConsumerRecords} object for a specific topic-partition * are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records * in a batch are performed atomically. This makes error handling significantly more straightforward because there can be @@ -195,12 +195,14 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; * * <h4>Per-record acknowledgement (explicit acknowledgement)</h4> * This example demonstrates using different acknowledgement types depending on the outcome of processing the records. + * Here the {@code share.acknowledgement.mode} property is set to "explicit" so the consumer must explicitly acknowledge each record. * <pre> * Properties props = new Properties(); * props.setProperty("bootstrap.servers", "localhost:9092"); * props.setProperty("group.id", "test"); * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("share.acknowledgement.mode", "explicit"); * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); * consumer.subscribe(Arrays.asList("foo")); * while (true) { @@ -227,42 +229,6 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; * It is only once {@link #commitSync()} is called that the acknowledgements are committed by sending the new state * information to Kafka. * - * <h4>Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)</h4> - * This example demonstrates ending processing of a batch of records on the first error. - * <pre> - * Properties props = new Properties(); - * props.setProperty("bootstrap.servers", "localhost:9092"); - * props.setProperty("group.id", "test"); - * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); - * consumer.subscribe(Arrays.asList("foo")); - * while (true) { - * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); - * for (ConsumerRecord<String, String> record : records) { - * try { - * doProcessing(record); - * consumer.acknowledge(record, AcknowledgeType.ACCEPT); - * } catch (Exception e) { - * consumer.acknowledge(record, AcknowledgeType.REJECT); - * break; - * } - * } - * consumer.commitSync(); - * } - * </pre> - * There are the following cases in this example: - * <ol> - * <li>The batch contains no records, in which case the application just polls again. The call to {@link #commitSync()} - * just does nothing because the batch was empty.</li> - * <li>All of the records in the batch are processed successfully. The calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} - * specifying {@code AcknowledgeType.ACCEPT} mark all records in the batch as successfully processed.</li> - * <li>One of the records encounters an exception. The call to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} specifying - * {@code AcknowledgeType.REJECT} rejects that record. Earlier records in the batch have already been marked as successfully - * processed. The call to {@link #commitSync()} commits the acknowledgements, but the records after the failed record - * remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.</li> - * </ol> - * * <h3>Reading Transactional Records</h3> * The way that share groups handle transactional records is controlled by the {@code group.share.isolation.level}</code> * configuration property. In a share group, the isolation level applies to the entire share group, not just individual diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java new file mode 100644 index 00000000000..0f8effa30a1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.stream.Collectors; + +public class ShareAcknowledgementMode { + public enum AcknowledgementMode { + IMPLICIT, EXPLICIT; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + } + + private final AcknowledgementMode acknowledgementMode; + + public static final ShareAcknowledgementMode IMPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.IMPLICIT); + public static final ShareAcknowledgementMode EXPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.EXPLICIT); + + private ShareAcknowledgementMode(AcknowledgementMode acknowledgementMode) { + this.acknowledgementMode = acknowledgementMode; + } + + /** + * Returns the ShareAcknowledgementMode from the given string. + */ + public static ShareAcknowledgementMode fromString(String acknowledgementMode) { + if (acknowledgementMode == null) { + throw new IllegalArgumentException("Acknowledgement mode is null"); + } + + if (Arrays.asList(Utils.enumOptions(AcknowledgementMode.class)).contains(acknowledgementMode)) { + AcknowledgementMode mode = AcknowledgementMode.valueOf(acknowledgementMode.toUpperCase(Locale.ROOT)); + switch (mode) { + case IMPLICIT: + return IMPLICIT; + case EXPLICIT: + return EXPLICIT; + default: + throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode); + } + } else { + throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode); + } + } + + /** + * Returns the name of the acknowledgement mode. + */ + public String name() { + return acknowledgementMode.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShareAcknowledgementMode that = (ShareAcknowledgementMode) o; + return acknowledgementMode == that.acknowledgementMode; + } + + @Override + public int hashCode() { + return Objects.hash(acknowledgementMode); + } + + @Override + public String toString() { + return "ShareAcknowledgementMode{" + + "mode=" + acknowledgementMode + + '}'; + } + + public static class Validator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String acknowledgementMode = (String) value; + try { + fromString(acknowledgementMode); + } catch (Exception e) { + throw new ConfigException(name, value, "Invalid value `" + acknowledgementMode + "` for configuration " + + name + ". The value must either be 'implicit' or 'explicit'."); + } + } + + @Override + public String toString() { + String values = Arrays.stream(ShareAcknowledgementMode.AcknowledgementMode.values()) + .map(ShareAcknowledgementMode.AcknowledgementMode::toString).collect(Collectors.joining(", ")); + return "[" + values + "]"; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index c77443e39a1..bb5193f8dc6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -176,20 +176,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { private ShareFetch<K, V> currentFetch; private AcknowledgementCommitCallbackHandler acknowledgementCommitCallbackHandler; private final List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements; - - private enum AcknowledgementMode { - /** Acknowledgement mode is not yet known */ - UNKNOWN, - /** Acknowledgement mode is pending, meaning that {@link #poll(Duration)} has been called once and - * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} has not been called */ - PENDING, - /** Acknowledgements are explicit, using {@link #acknowledge(ConsumerRecord, AcknowledgeType)} */ - EXPLICIT, - /** Acknowledgements are implicit, not using {@link #acknowledge(ConsumerRecord, AcknowledgeType)} */ - IMPLICIT - } - - private AcknowledgementMode acknowledgementMode; + private final ShareAcknowledgementMode acknowledgementMode; /** * A thread-safe {@link ShareFetchBuffer fetch buffer} for the results that are populated in the @@ -457,7 +444,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { final ConsumerMetadata metadata, final int requestTimeoutMs, final int defaultApiTimeoutMs, - final String groupId) { + final String groupId, + final String acknowledgementModeConfig) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = clientId; @@ -472,7 +460,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { this.metadata = metadata; this.requestTimeoutMs = requestTimeoutMs; this.defaultApiTimeoutMs = defaultApiTimeoutMs; - this.acknowledgementMode = initializeAcknowledgementMode(null, log); + this.acknowledgementMode = ShareAcknowledgementMode.fromString(acknowledgementModeConfig); this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics); this.currentFetch = ShareFetch.empty(); this.applicationEventHandler = applicationEventHandler; @@ -582,7 +570,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { handleCompletedAcknowledgements(); // If using implicit acknowledgement, acknowledge the previously fetched records - acknowledgeBatchIfImplicitAcknowledgement(true); + acknowledgeBatchIfImplicitAcknowledgement(); kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs()); @@ -674,6 +662,10 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { // Notify the network thread to wake up and start the next round of fetching applicationEventHandler.wakeupNetworkThread(); } + if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) { + // We cannot leave unacknowledged records in EXPLICIT acknowledgement mode, so we throw an exception to the application. + throw new IllegalStateException("All records must be acknowledged in explicit acknowledgement mode."); + } return currentFetch; } } @@ -719,7 +711,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { handleCompletedAcknowledgements(); // If using implicit acknowledgement, acknowledge the previously fetched records - acknowledgeBatchIfImplicitAcknowledgement(false); + acknowledgeBatchIfImplicitAcknowledgement(); Timer requestTimer = time.timer(timeout.toMillis()); Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend(); @@ -763,7 +755,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { handleCompletedAcknowledgements(); // If using implicit acknowledgement, acknowledge the previously fetched records - acknowledgeBatchIfImplicitAcknowledgement(false); + acknowledgeBatchIfImplicitAcknowledgement(); Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend(); if (!acknowledgementsMap.isEmpty()) { @@ -1040,29 +1032,11 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { } /** - * Called to progressively move the acknowledgement mode into IMPLICIT if it is not known to be EXPLICIT. * If the acknowledgement mode is IMPLICIT, acknowledges all records in the current batch. - * - * @param calledOnPoll If true, called on poll. Otherwise, called on commit. */ - private void acknowledgeBatchIfImplicitAcknowledgement(boolean calledOnPoll) { - if (calledOnPoll) { - if (acknowledgementMode == AcknowledgementMode.UNKNOWN) { - // The first call to poll(Duration) moves into PENDING - acknowledgementMode = AcknowledgementMode.PENDING; - } else if (acknowledgementMode == AcknowledgementMode.PENDING && !currentFetch.isEmpty()) { - // If there are records to acknowledge and PENDING, moves into IMPLICIT - acknowledgementMode = AcknowledgementMode.IMPLICIT; - } - } else { - // If there are records to acknowledge and PENDING, moves into IMPLICIT - if (acknowledgementMode == AcknowledgementMode.PENDING && !currentFetch.isEmpty()) { - acknowledgementMode = AcknowledgementMode.IMPLICIT; - } - } - + private void acknowledgeBatchIfImplicitAcknowledgement() { // If IMPLICIT, acknowledge all records - if (acknowledgementMode == AcknowledgementMode.IMPLICIT) { + if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) { currentFetch.acknowledgeAll(AcknowledgeType.ACCEPT); } } @@ -1075,36 +1049,20 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { } /** - * Called to move the acknowledgement mode into EXPLICIT, if it is not known to be IMPLICIT. + * Called to verify if the acknowledgement mode is EXPLICIT, else throws an exception. */ private void ensureExplicitAcknowledgement() { - if (acknowledgementMode == AcknowledgementMode.PENDING) { - // If poll(Duration) has been called once, moves into EXPLICIT - acknowledgementMode = AcknowledgementMode.EXPLICIT; - } else if (acknowledgementMode == AcknowledgementMode.IMPLICIT) { + if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) { throw new IllegalStateException("Implicit acknowledgement of delivery is being used."); - } else if (acknowledgementMode == AcknowledgementMode.UNKNOWN) { - throw new IllegalStateException("Acknowledge called before poll."); } } /** * Initializes the acknowledgement mode based on the configuration. */ - private static AcknowledgementMode initializeAcknowledgementMode(ConsumerConfig config, Logger log) { - if (config == null) { - return AcknowledgementMode.UNKNOWN; - } - String acknowledgementModeStr = config.getString(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG); - if ((acknowledgementModeStr == null) || acknowledgementModeStr.isEmpty()) { - return AcknowledgementMode.UNKNOWN; - } else if (acknowledgementModeStr.equalsIgnoreCase("implicit")) { - return AcknowledgementMode.IMPLICIT; - } else if (acknowledgementModeStr.equalsIgnoreCase("explicit")) { - return AcknowledgementMode.EXPLICIT; - } - log.warn("Invalid value for config {}: \"{}\"", ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, acknowledgementModeStr); - return AcknowledgementMode.UNKNOWN; + private static ShareAcknowledgementMode initializeAcknowledgementMode(ConsumerConfig config, Logger log) { + String s = config.getString(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG); + return ShareAcknowledgementMode.fromString(s); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java new file mode 100644 index 00000000000..1ae0df0634d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ShareAcknowledgementModeTest { + + @Test + public void testFromString() { + assertEquals(ShareAcknowledgementMode.IMPLICIT, ShareAcknowledgementMode.fromString("implicit")); + assertEquals(ShareAcknowledgementMode.EXPLICIT, ShareAcknowledgementMode.fromString("explicit")); + assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString("invalid")); + assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString("IMPLICIT")); + assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString("EXPLICIT")); + assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString("")); + assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString(null)); + } + + @Test + public void testValidator() { + ShareAcknowledgementMode.Validator validator = new ShareAcknowledgementMode.Validator(); + assertDoesNotThrow(() -> validator.ensureValid("test", "implicit")); + assertDoesNotThrow(() -> validator.ensureValid("test", "explicit")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "IMPLICIT")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "EXPLICIT")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", null)); + } + + @Test + public void testEqualsAndHashCode() { + ShareAcknowledgementMode mode1 = ShareAcknowledgementMode.IMPLICIT; + ShareAcknowledgementMode mode2 = ShareAcknowledgementMode.IMPLICIT; + ShareAcknowledgementMode mode3 = ShareAcknowledgementMode.EXPLICIT; + + assertEquals(mode1, mode2); + assertNotEquals(mode1, mode3); + assertNotEquals(mode2, mode3); + + assertEquals(mode1.hashCode(), mode2.hashCode()); + assertNotEquals(mode1.hashCode(), mode3.hashCode()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 342540c5885..64c729730fb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; @@ -56,6 +57,7 @@ import org.mockito.Mockito; import java.time.Duration; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -145,14 +147,16 @@ public class ShareConsumerImplTest { mock(ShareFetchBuffer.class), subscriptions, "group-id", - "client-id"); + "client-id", + "implicit"); } private ShareConsumerImpl<String, String> newConsumer( ShareFetchBuffer fetchBuffer, SubscriptionState subscriptions, String groupId, - String clientId + String clientId, + String acknowledgementMode ) { final int defaultApiTimeoutMs = 1000; final int requestTimeoutMs = 30000; @@ -173,7 +177,8 @@ public class ShareConsumerImplTest { metadata, requestTimeoutMs, defaultApiTimeoutMs, - groupId + groupId, + acknowledgementMode ); } @@ -345,6 +350,84 @@ public class ShareConsumerImplTest { assertDoesNotThrow(() -> consumer.close()); } + @Test + public void testExplicitModeUnacknowledgedRecords() { + // Setup consumer with explicit acknowledgement mode + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + consumer = newConsumer( + mock(ShareFetchBuffer.class), + subscriptions, + "group-id", + "client-id", + "explicit"); + + // Setup test data + String topic = "test-topic"; + int partition = 0; + TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), partition, topic); + ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, tip); + batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1", "value1")); + batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2", "value2")); + + // Setup first fetch to return records + ShareFetch<String, String> firstFetch = ShareFetch.empty(); + firstFetch.add(tip, batch); + doReturn(firstFetch) + .doReturn(ShareFetch.empty()) + .when(fetchCollector) + .collect(any(ShareFetchBuffer.class)); + + // Setup subscription + List<String> topics = Collections.singletonList(topic); + completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, topics); + consumer.subscribe(topics); + + // First poll should succeed and return records + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + assertEquals(2, records.count(), "Should have received 2 records"); + + // Second poll should fail because records weren't acknowledged + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> consumer.poll(Duration.ofMillis(100)) + ); + assertTrue( + exception.getMessage().contains("All records must be acknowledged in explicit acknowledgement mode."), + "Unexpected error message: " + exception.getMessage() + ); + + // Verify that acknowledging one record but not all still throws exception + Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); + consumer.acknowledge(iterator.next()); + exception = assertThrows( + IllegalStateException.class, + () -> consumer.poll(Duration.ofMillis(100)) + ); + assertTrue( + exception.getMessage().contains("All records must be acknowledged in explicit acknowledgement mode."), + "Unexpected error message: " + exception.getMessage() + ); + + // Verify that after acknowledging all records, poll succeeds + consumer.acknowledge(iterator.next()); + + // Setup second fetch to return new records + ShareFetch<String, String> secondFetch = ShareFetch.empty(); + ShareInFlightBatch<String, String> newBatch = new ShareInFlightBatch<>(2, tip); + newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3", "value3")); + newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4", "value4")); + secondFetch.add(tip, newBatch); + + // Reset mock to return new records + doReturn(secondFetch) + .when(fetchCollector) + .collect(any(ShareFetchBuffer.class)); + + // Verify that poll succeeds and returns new records + ConsumerRecords<String, String> newRecords = consumer.poll(Duration.ofMillis(100)); + assertEquals(2, newRecords.count(), "Should have received 2 new records"); + } + @Test public void testCloseWithTopicAuthorizationException() { SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java index 699397f9a70..43ba5185dc0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java @@ -17,6 +17,7 @@ package org.apache.kafka.tools.consumer; import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaShareConsumer; import org.apache.kafka.clients.consumer.ShareConsumer; @@ -36,6 +37,7 @@ import java.time.Duration; import java.util.Collections; import java.util.Iterator; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -66,7 +68,11 @@ public class ConsoleShareConsumer { messageCount = 0; long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; - ShareConsumer<byte[], byte[]> consumer = new KafkaShareConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); + Properties consumerProps = opts.consumerProps(); + // Set share acknowledgement mode to explicit. + consumerProps.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); + + ShareConsumer<byte[], byte[]> consumer = new KafkaShareConsumer<>(consumerProps, new ByteArrayDeserializer(), new ByteArrayDeserializer()); ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts.topicArg(), consumer, timeoutMs); addShutdownHook(consumerWrapper);