This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1da30bdedf4 KAFKA-18900: Experimental share consumer acknowledge mode config (#19113) 1da30bdedf4 is described below commit 1da30bdedf457b507ed9879a78e31c9e4c74026c Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Thu Mar 6 17:57:11 2025 +0000 KAFKA-18900: Experimental share consumer acknowledge mode config (#19113) User testing of the `KafkaShareConsumer` interface has revealed some areas which confuse people. One of these is that way that it decides whether you want to use implicit or explicit acknowledgement of records by observing which calls the application issues. We are taking the opportunity to refine the interface before it is finalised. This PR introduces an experimental configuration called `internal.share.acknowledgement.mode` which can be used to make the application declare which kind of acknowledgement it wishes to use. We plan to try out the configuration, assess whether it has helped, and then create a proper consumer configuration that makes this area better. That would require a lot of change in the tests, which explains why this initial PR only has a small number of tests. Reviewers: David Arthur <mum...@gmail.com> --- .../kafka/clients/consumer/AcknowledgeType.java | 4 + .../kafka/clients/consumer/ConsumerConfig.java | 41 +++-- .../kafka/clients/consumer/KafkaShareConsumer.java | 2 + .../kafka/clients/consumer/ShareConsumer.java | 2 + .../clients/consumer/ShareConsumerConfig.java | 15 +- .../consumer/internals/ShareConsumerImpl.java | 24 ++- .../java/kafka/test/api/ShareConsumerTest.java | 186 +++++++++++++-------- 7 files changed, 184 insertions(+), 90 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java index 14b5415c2a4..b42bc135363 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java @@ -20,6 +20,10 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Locale; +/** + * The acknowledge type is used with {@link KafkaShareConsumer#acknowledge(ConsumerRecord, AcknowledgeType)} to indicate + * whether the record was consumed successfully. + */ @InterfaceStability.Evolving public enum AcknowledgeType { /** The record was consumed successfully. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index d9cee3ec179..e5917d9d593 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -380,13 +380,28 @@ public class ConsumerConfig extends AbstractConfig { public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG; private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; + /** + * <code>share.acknowledgement.mode</code> is being evaluated as a new configuration to control the acknowledgement mode + * for share consumers. It will be removed or converted to a proper configuration before release. + * An alternative being considered is <code>enable.explicit.share.acknowledgement</code> as a boolean configuration. + */ + public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "internal.share.acknowledgement.mode"; + private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." + + " If unset, the acknowledgement mode of the consumer is decided by the method calls it uses to fetch and commit." + + " If set to <code>implicit</code>, the acknowledgement mode of the consumer is implicit and it must not" + + " use <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records. Instead," + + " delivery is acknowledged implicitly on the next call to poll or commit." + + " If set to <code>explicit</code>, the acknowledgement mode of the consumer is explicit and it must use" + + " <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records."; + private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); /** * A list of configuration keys not supported for CLASSIC protocol. */ - private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = Collections.singletonList( - GROUP_REMOTE_ASSIGNOR_CONFIG + private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = List.of( + GROUP_REMOTE_ASSIGNOR_CONFIG, + INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG ); /** @@ -395,7 +410,8 @@ public class ConsumerConfig extends AbstractConfig { private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = List.of( PARTITION_ASSIGNMENT_STRATEGY_CONFIG, HEARTBEAT_INTERVAL_MS_CONFIG, - SESSION_TIMEOUT_MS_CONFIG + SESSION_TIMEOUT_MS_CONFIG, + INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG ); static { @@ -678,8 +694,13 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS, atLeast(0), Importance.LOW, - CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC); - + CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC) + .define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, + Type.STRING, + null, + in(null, "implicit", "explicit"), + Importance.MEDIUM, + ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC); } @Override @@ -689,7 +710,7 @@ public class ConsumerConfig extends AbstractConfig { Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); maybeOverrideClientId(refinedConfigs); maybeOverrideEnableAutoCommit(refinedConfigs); - checkUnsupportedConfigs(); + checkUnsupportedConfigsPostProcess(); return refinedConfigs; } @@ -736,16 +757,16 @@ public class ConsumerConfig extends AbstractConfig { } } - private void checkUnsupportedConfigs() { + protected void checkUnsupportedConfigsPostProcess() { String groupProtocol = getString(GROUP_PROTOCOL_CONFIG); if (GroupProtocol.CLASSIC.name().equalsIgnoreCase(groupProtocol)) { - checkUnsupportedConfigs(GroupProtocol.CLASSIC, CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS); + checkUnsupportedConfigsPostProcess(GroupProtocol.CLASSIC, CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS); } else if (GroupProtocol.CONSUMER.name().equalsIgnoreCase(groupProtocol)) { - checkUnsupportedConfigs(GroupProtocol.CONSUMER, CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS); + checkUnsupportedConfigsPostProcess(GroupProtocol.CONSUMER, CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS); } } - private void checkUnsupportedConfigs(GroupProtocol groupProtocol, List<String> unsupportedConfigs) { + private void checkUnsupportedConfigsPostProcess(GroupProtocol groupProtocol, List<String> unsupportedConfigs) { if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) { List<String> invalidConfigs = new ArrayList<>(); unsupportedConfigs.forEach(configName -> { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index a5c92394353..f8827ea2587 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -140,6 +140,8 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; * thrown by a failure to commit the acknowledgements.</li> * <li>The application calls {@link #close()} which releases any acquired records without acknowledgement.</li> * </ul> + * <p>The consumer can optionally use the {@code internal.share.acknowledgement.mode} configuration property to choose + * between implicit and explicit acknowledgement, specifying <code>"implicit"</code> or <code>"explicit"</code> as required. * <p> * The consumer guarantees that the records returned in the {@code ConsumerRecords} object for a specific topic-partition * are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java index 2d926b2cad5..900c249d852 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java @@ -32,6 +32,8 @@ import java.util.Optional; import java.util.Set; /** + * A client that consumes records from a Kafka cluster using a share group. + * * @see KafkaShareConsumer * @see MockShareConsumer */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java index ce22d7b0a2e..84971abf1e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java @@ -24,9 +24,9 @@ import java.util.Map; import java.util.Properties; /** - * The share consumer configuration keys + * The consumer configuration behavior specific to share groups. */ -public class ShareConsumerConfig extends ConsumerConfig { +class ShareConsumerConfig extends ConsumerConfig { /** * A list of configuration keys not supported for SHARE consumer. */ @@ -43,11 +43,11 @@ public class ShareConsumerConfig extends ConsumerConfig { ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG ); - public ShareConsumerConfig(Properties props) { + ShareConsumerConfig(Properties props) { super(props); } - public ShareConsumerConfig(Map<String, Object> props) { + ShareConsumerConfig(Map<String, Object> props) { super(props); } @@ -57,11 +57,11 @@ public class ShareConsumerConfig extends ConsumerConfig { @Override protected Map<String, Object> preProcessParsedConfig(final Map<String, Object> parsedValues) { - checkUnsupportedConfigs(parsedValues); + checkUnsupportedConfigsPreProcess(parsedValues); return parsedValues; } - private void checkUnsupportedConfigs(Map<String, Object> parsedValues) { + private void checkUnsupportedConfigsPreProcess(Map<String, Object> parsedValues) { List<String> invalidConfigs = new ArrayList<>(); SHARE_GROUP_UNSUPPORTED_CONFIGS.forEach(configName -> { if (parsedValues.containsKey(configName)) { @@ -74,4 +74,7 @@ public class ShareConsumerConfig extends ConsumerConfig { } } + @Override + protected void checkUnsupportedConfigsPostProcess() { + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 4dc8cf4c697..7763f308cb0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -187,7 +187,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { IMPLICIT } - private AcknowledgementMode acknowledgementMode = AcknowledgementMode.UNKNOWN; + private AcknowledgementMode acknowledgementMode; /** * A thread-safe {@link ShareFetchBuffer fetch buffer} for the results that are populated in the @@ -258,6 +258,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { this.metrics = createMetrics(config, time, reporters); this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); + this.acknowledgementMode = initializeAcknowledgementMode(config, log); this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics); this.currentFetch = ShareFetch.empty(); this.subscriptions = createSubscriptionState(config, logContext); @@ -369,6 +370,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { this.subscriptions = subscriptions; this.metadata = metadata; this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); + this.acknowledgementMode = initializeAcknowledgementMode(config, log); this.fetchBuffer = new ShareFetchBuffer(logContext); this.completedAcknowledgements = new LinkedList<>(); @@ -463,6 +465,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { this.metrics = metrics; this.metadata = metadata; this.defaultApiTimeoutMs = defaultApiTimeoutMs; + this.acknowledgementMode = initializeAcknowledgementMode(null, log); this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics); this.currentFetch = ShareFetch.empty(); this.applicationEventHandler = applicationEventHandler; @@ -1062,6 +1065,25 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { } } + /** + * Initializes the acknowledgement mode based on the configuration. + */ + private static AcknowledgementMode initializeAcknowledgementMode(ConsumerConfig config, Logger log) { + if (config == null) { + return AcknowledgementMode.UNKNOWN; + } + String acknowledgementModeStr = config.getString(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG); + if ((acknowledgementModeStr == null) || acknowledgementModeStr.isEmpty()) { + return AcknowledgementMode.UNKNOWN; + } else if (acknowledgementModeStr.equalsIgnoreCase("implicit")) { + return AcknowledgementMode.IMPLICIT; + } else if (acknowledgementModeStr.equalsIgnoreCase("explicit")) { + return AcknowledgementMode.EXPLICIT; + } + log.warn("Invalid value for config {}: \"{}\"", ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, acknowledgementModeStr); + return AcknowledgementMode.UNKNOWN; + } + /** * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. * It is possible that {@link ErrorEvent an error} diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 0ee38d8aba5..f485e4f816e 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -75,7 +75,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -160,7 +159,7 @@ public class ShareConsumerTest { public void testPollNoSubscribeFails() { setup(); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - assertEquals(Collections.emptySet(), shareConsumer.subscription()); + assertEquals(Set.of(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); } @@ -171,7 +170,7 @@ public class ShareConsumerTest { setup(); alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - Set<String> subscription = Collections.singleton(tp.topic()); + Set<String> subscription = Set.of(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); @@ -185,12 +184,12 @@ public class ShareConsumerTest { setup(); alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - Set<String> subscription = Collections.singleton(tp.topic()); + Set<String> subscription = Set.of(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.unsubscribe(); - assertEquals(Collections.emptySet(), shareConsumer.subscription()); + assertEquals(Set.of(), shareConsumer.subscription()); assertEquals(0, records.count()); verifyShareGroupStateTopicRecordsProduced(); } @@ -201,7 +200,7 @@ public class ShareConsumerTest { setup(); alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - Set<String> subscription = Collections.singleton(tp.topic()); + Set<String> subscription = Set.of(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); @@ -219,12 +218,12 @@ public class ShareConsumerTest { setup(); alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - Set<String> subscription = Collections.singleton(tp.topic()); + Set<String> subscription = Set.of(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.unsubscribe(); - assertEquals(Collections.emptySet(), shareConsumer.subscription()); + assertEquals(Set.of(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); @@ -237,12 +236,12 @@ public class ShareConsumerTest { setup(); alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - Set<String> subscription = Collections.singleton(tp.topic()); + Set<String> subscription = Set.of(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); - shareConsumer.subscribe(Collections.emptySet()); - assertEquals(Collections.emptySet(), shareConsumer.subscription()); + shareConsumer.subscribe(Set.of()); + assertEquals(Set.of(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); @@ -260,7 +259,7 @@ public class ShareConsumerTest { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); verifyShareGroupStateTopicRecordsProduced(); @@ -277,7 +276,7 @@ public class ShareConsumerTest { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); producer.send(record); @@ -307,12 +306,12 @@ public class ShareConsumerTest { producer.flush(); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); - shareConsumer.subscribe(Collections.singletonList(tp2.topic())); + shareConsumer.subscribe(Set.of(tp2.topic())); // Waiting for heartbeat to propagate the subscription change. TestUtils.waitForCondition(() -> { @@ -342,7 +341,7 @@ public class ShareConsumerTest { producer.flush(); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); @@ -371,7 +370,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); @@ -403,7 +402,7 @@ public class ShareConsumerTest { producer.flush(); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); @@ -457,7 +456,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); assertEquals(numRecords, records.size()); @@ -489,7 +488,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); assertEquals(numRecords, records.size()); @@ -511,12 +510,12 @@ public class ShareConsumerTest { alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", - Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords)))) { + Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords)))) { long startingTimestamp = System.currentTimeMillis(); produceMessagesWithTimestamp(numRecords, startingTimestamp); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); long i = 0L; @@ -564,7 +563,7 @@ public class ShareConsumerTest { transactionalProducer.close(); nonTransactionalProducer.close(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(4, records.count()); @@ -594,7 +593,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(shareConsumer::acknowledge); @@ -616,7 +615,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(shareConsumer::acknowledge); @@ -645,8 +644,8 @@ public class ShareConsumerTest { producer.send(record3); producer.flush(); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); - shareConsumer2.subscribe(Collections.singleton(tp.topic())); + shareConsumer1.subscribe(Set.of(tp.topic())); + shareConsumer2.subscribe(Set.of(tp.topic())); Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>(); Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>(); @@ -693,7 +692,7 @@ public class ShareConsumerTest { try (Producer<byte[], byte[]> producer = createProducer(); ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>(); Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>(); @@ -739,7 +738,7 @@ public class ShareConsumerTest { producer.send(record3); producer.flush(); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); + shareConsumer1.subscribe(Set.of(tp.topic())); Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); @@ -799,7 +798,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); @@ -822,7 +821,7 @@ public class ShareConsumerTest { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); @@ -844,7 +843,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); @@ -863,7 +862,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0); @@ -886,7 +885,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0); @@ -908,7 +907,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync(); @@ -936,7 +935,7 @@ public class ShareConsumerTest { producer.send(record3); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>(); Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>(); @@ -961,6 +960,52 @@ public class ShareConsumerTest { } } + @ClusterTest + public void testConfiguredExplicitAcknowledgeCommitSuccess() { + setup(); + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) { + + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(shareConsumer::acknowledge); + producer.send(record); + Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testConfiguredImplicitAcknowledgeExplicitAcknowledgeFails() { + setup(); + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "implicit"))) { + + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(records.iterator().next())); + } + } + @ClusterTest public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws Exception { setup(); @@ -971,16 +1016,14 @@ public class ShareConsumerTest { Producer<byte[], byte[]> producer = createProducer(); ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( "group1", - Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxPartitionFetchBytes)) - ) - ) { + Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxPartitionFetchBytes)))) { ProducerRecord<byte[], byte[]> smallRecord = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); ProducerRecord<byte[], byte[]> bigRecord = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), new byte[maxPartitionFetchBytes]); producer.send(smallRecord).get(); producer.send(bigRecord).get(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(2, records.count()); @@ -999,9 +1042,8 @@ public class ShareConsumerTest { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); - - shareConsumer2.subscribe(Collections.singleton(tp.topic())); + shareConsumer1.subscribe(Set.of(tp.topic())); + shareConsumer2.subscribe(Set.of(tp.topic())); // producing 3 records to the topic producer.send(record); @@ -1049,8 +1091,8 @@ public class ShareConsumerTest { ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer("group1")) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); - shareConsumer2.subscribe(Collections.singleton(tp.topic())); + shareConsumer1.subscribe(Set.of(tp.topic())); + shareConsumer2.subscribe(Set.of(tp.topic())); int totalMessages = 2000; for (int i = 0; i < totalMessages; i++) { @@ -1185,8 +1227,8 @@ public class ShareConsumerTest { ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer("group1")) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); - shareConsumer2.subscribe(Collections.singleton(tp.topic())); + shareConsumer1.subscribe(Set.of(tp.topic())); + shareConsumer2.subscribe(Set.of(tp.topic())); int totalMessages = 1500; for (int i = 0; i < totalMessages; i++) { @@ -1347,7 +1389,7 @@ public class ShareConsumerTest { producer.flush(); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); // The acknowledgment commit callback will try to call a method of ShareConsumer shareConsumer.poll(Duration.ofMillis(5000)); @@ -1370,7 +1412,7 @@ public class ShareConsumerTest { public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) { // Accessing methods of ShareConsumer should throw an exception. assertThrows(IllegalStateException.class, shareConsumer::close); - assertThrows(IllegalStateException.class, () -> shareConsumer.subscribe(Collections.singleton(tp.topic()))); + assertThrows(IllegalStateException.class, () -> shareConsumer.subscribe(Set.of(tp.topic()))); assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(5000))); } } @@ -1392,7 +1434,7 @@ public class ShareConsumerTest { // The acknowledgment commit callback will try to call a method of ShareConsumer shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); @@ -1444,7 +1486,7 @@ public class ShareConsumerTest { producer.flush(); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackThrows<>()); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); @@ -1479,7 +1521,7 @@ public class ShareConsumerTest { alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); // interrupt the thread and call poll try { @@ -1504,7 +1546,7 @@ public class ShareConsumerTest { alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { - shareConsumer.subscribe(Collections.singleton("topic abc")); + shareConsumer.subscribe(Set.of("topic abc")); // The exception depends upon a metadata response which arrives asynchronously. If the delay is // too short, the poll might return before the error is known. @@ -1527,7 +1569,7 @@ public class ShareConsumerTest { producer.send(record); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); shareConsumer.wakeup(); assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO)); @@ -1546,7 +1588,7 @@ public class ShareConsumerTest { ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { String topic = "foo"; - shareConsumer.subscribe(Collections.singleton(topic)); + shareConsumer.subscribe(Set.of(topic)); // Topic is created post creation of share consumer and subscription createTopic(topic); @@ -1630,7 +1672,7 @@ public class ShareConsumerTest { } // We delete records before offset 5, so the LSO should move to 5. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); + adminClient.deleteRecords(Map.of(tp, RecordsToDelete.beforeOffset(5L))); int messageCount = consumeMessages(new AtomicInteger(0), 5, groupId, 1, 10, true); // The records returned belong to offsets 5-9. @@ -1642,14 +1684,14 @@ public class ShareConsumerTest { } // We delete records before offset 14, so the LSO should move to 14. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(14L))); + adminClient.deleteRecords(Map.of(tp, RecordsToDelete.beforeOffset(14L))); int consumeMessagesCount = consumeMessages(new AtomicInteger(0), 1, groupId, 1, 10, true); // The record returned belong to offset 14. assertEquals(1, consumeMessagesCount); // We delete records before offset 15, so the LSO should move to 15 and now no records should be returned. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(15L))); + adminClient.deleteRecords(Map.of(tp, RecordsToDelete.beforeOffset(15L))); messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 1, 5, true); assertEquals(0, messageCount); @@ -1663,7 +1705,7 @@ public class ShareConsumerTest { try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1"); Producer<byte[], byte[]> producer = createProducer()) { - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); // Producing a record. producer.send(record); @@ -1691,7 +1733,7 @@ public class ShareConsumerTest { try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1"); Producer<byte[], byte[]> producer = createProducer()) { - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); // Producing a record. producer.send(record); @@ -1719,8 +1761,7 @@ public class ShareConsumerTest { Producer<byte[], byte[]> producer = createProducer(); Admin adminClient = createAdminClient() ) { - - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic. @@ -1729,7 +1770,7 @@ public class ShareConsumerTest { } // We delete records before offset 5, so the LSO should move to 5. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); + adminClient.deleteRecords(Map.of(tp, RecordsToDelete.beforeOffset(5L))); int consumedMessageCount = consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true); // The records returned belong to offsets 5-9. @@ -1747,9 +1788,8 @@ public class ShareConsumerTest { ShareConsumer<byte[], byte[]> shareConsumerLatest = createShareConsumer("group2"); Producer<byte[], byte[]> producer = createProducer()) { - shareConsumerEarliest.subscribe(Collections.singleton(tp.topic())); - - shareConsumerLatest.subscribe(Collections.singleton(tp.topic())); + shareConsumerEarliest.subscribe(Set.of(tp.topic())); + shareConsumerLatest.subscribe(Set.of(tp.topic())); ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); // Producing a record. @@ -1805,7 +1845,7 @@ public class ShareConsumerTest { producer.send(currentRecord).get(); producer.flush(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); // Should only receive messages from last hour (recent and current) List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, 2); @@ -1824,7 +1864,7 @@ public class ShareConsumerTest { try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group2"); Producer<byte[], byte[]> producer = createProducer()) { - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, 3); assertEquals(3, records.size()); verifyShareGroupStateTopicRecordsProduced(); @@ -2123,7 +2163,7 @@ public class ShareConsumerTest { return assertDoesNotThrow(() -> { try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( groupId)) { - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); return consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit); } }, "Consumer " + consumerNumber + " failed with exception"); @@ -2140,7 +2180,7 @@ public class ShareConsumerTest { try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( groupId, Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes))) { - shareConsumer.subscribe(Collections.singleton(tp.topic())); + shareConsumer.subscribe(Set.of(tp.topic())); return consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit); } }, "Consumer " + consumerNumber + " failed with exception"); @@ -2200,7 +2240,7 @@ public class ShareConsumerTest { AtomicReference<Uuid> topicId = new AtomicReference<>(null); assertDoesNotThrow(() -> { try (Admin admin = createAdminClient()) { - CreateTopicsResult result = admin.createTopics(Collections.singleton(new NewTopic(topicName, numPartitions, (short) replicationFactor))); + CreateTopicsResult result = admin.createTopics(Set.of(new NewTopic(topicName, numPartitions, (short) replicationFactor))); result.all().get(); topicId.set(result.topicId(topicName).get()); } @@ -2212,7 +2252,7 @@ public class ShareConsumerTest { private void deleteTopic(String topicName) { assertDoesNotThrow(() -> { try (Admin admin = createAdminClient()) { - admin.deleteTopics(Collections.singleton(topicName)).all().get(); + admin.deleteTopics(Set.of(topicName)).all().get(); } }, "Failed to delete topic"); } @@ -2257,7 +2297,7 @@ public class ShareConsumerTest { createTopic(warmupTp.topic()); waitForMetadataCache(); ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null, "key".getBytes(), "value".getBytes()); - Set<String> subscription = Collections.singleton(warmupTp.topic()); + Set<String> subscription = Set.of(warmupTp.topic()); alterShareAutoOffsetReset("warmupgroup1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("warmupgroup1")) {