This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81d86e59c28 KAFKA-19845: 2/N Additional share consumers tests. (#20900)
81d86e59c28 is described below
commit 81d86e59c281fbe837cce854c1c5f6e1ac608967
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Nov 18 14:22:27 2025 +0530
KAFKA-19845: 2/N Additional share consumers tests. (#20900)
* A couple of additional share consumer tests to check dual ack with
renew and poll behavior with renew acks piggybacked.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 123 +++++++++++++++++++++
1 file changed, 123 insertions(+)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 90164404b33..62a4441a29a 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -62,10 +62,14 @@ import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
+import com.yammer.metrics.core.Meter;
+
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
@@ -162,6 +166,11 @@ public class ShareConsumerTest {
}
}
+ @AfterEach
+ public void tearDown() {
+ kafka.utils.TestUtils.clearYammerMetrics();
+ }
+
@ClusterTest
public void testPollNoSubscribeFails() {
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -2960,6 +2969,7 @@ public class ShareConsumerTest {
shareConsumer.commitSync();
assertEquals(15, acknowledgementsCommitted.get());
}
+ verifyYammerMetricCount("ackType=Renew", 5);
}
@ClusterTest
@@ -3005,6 +3015,119 @@ public class ShareConsumerTest {
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
}
}
+ verifyYammerMetricCount("ackType=Renew", 5);
+ }
+
+ @ClusterTest
+ public void testRenewAcknowledgementInvalidStateRecord() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) ->
+ offsetsByTopicPartition.forEach((tip, offsets) ->
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message
".getBytes());
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+
+ for (ConsumerRecord<byte[], byte[]> rec : records) {
+ shareConsumer.acknowledge(rec, AcknowledgeType.REJECT);
+ shareConsumer.commitSync();
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(rec, AcknowledgeType.RENEW));
+ }
+ }
+ verifyYammerMetricCount("ackType=Renew", 0);
+ }
+
+ @ClusterTest(
+ brokers = 1,
+ serverProperties = {
+ @ClusterConfigProperty(key =
"group.share.record.lock.duration.ms", value = "12000"),
+ @ClusterConfigProperty(key =
"group.share.min.record.lock.duration.ms", value = "12000"),
+ }
+ )
+ public void testRenewAcknowledgementNoResultInPoll() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) ->
+ offsetsByTopicPartition.forEach((tip, offsets) ->
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message
" + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ assertEquals(10, records.count());
+
+ int count = 0;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ if (count % 2 == 0) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ } else {
+ shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+ }
+ count++;
+ }
+
+ // 5 more records (total 15 produced).
+ for (int i = 10; i < 15; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message
" + i).getBytes());
+ producer.send(record);
+ }
+
+ // Get the rest of all 5 records.
+ records = waitedPoll(shareConsumer, 11500L, 0); // This will send
the acks but not return next 5 records (10-15)
+ assertEquals(10, acknowledgementsCommitted.get());
+ assertEquals(0, records.count());
+ verifyYammerMetricCount("ackType=Renew", 5);
+
+ // Renewal duration passed, now records will be back.
+ records = waitedPoll(shareConsumer, 2500L, 5); // Renewed records
as well as 10-15 records.
+ assertEquals(5, records.count());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ }
+
+ shareConsumer.commitSync();
+
+ records = waitedPoll(shareConsumer, 2500L, 5); // 10-15 records.
+ assertEquals(5, records.count());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ }
+
+ shareConsumer.commitSync();
+
+ // Initial - 5 renew + 5 accept, Subsequent - 5 renewed accepted +
5 fresh accepted (10-15)
+ assertEquals(20, acknowledgementsCommitted.get());
+ }
+ verifyYammerMetricCount("ackType=Renew", 5);
+ }
+
+ private void verifyYammerMetricCount(String filterString, int count) {
+ com.yammer.metrics.core.Metric renewAck =
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(entry -> entry.getKey().toString().contains(filterString))
+ .findAny()
+ .orElseThrow(() -> new AssertionError("metric not found"))
+ .getValue();
+
+ assertEquals(count, ((Meter) renewAck).count());
}
/**