This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81d86e59c28 KAFKA-19845: 2/N Additional share consumers tests. (#20900)
81d86e59c28 is described below

commit 81d86e59c281fbe837cce854c1c5f6e1ac608967
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Nov 18 14:22:27 2025 +0530

    KAFKA-19845: 2/N Additional share consumers tests. (#20900)
    
    * A couple of additional share consumer tests to check dual ack with
    renew and poll behavior with renew acks piggybacked.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 123 +++++++++++++++++++++
 1 file changed, 123 insertions(+)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 90164404b33..62a4441a29a 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -62,10 +62,14 @@ import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.test.NoRetryException;
 import org.apache.kafka.test.TestUtils;
 
+import com.yammer.metrics.core.Meter;
+
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
@@ -162,6 +166,11 @@ public class ShareConsumerTest {
         }
     }
 
+    @AfterEach
+    public void tearDown() {
+        kafka.utils.TestUtils.clearYammerMetrics();
+    }
+
     @ClusterTest
     public void testPollNoSubscribeFails() {
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -2960,6 +2969,7 @@ public class ShareConsumerTest {
             shareConsumer.commitSync();
             assertEquals(15, acknowledgementsCommitted.get());
         }
+        verifyYammerMetricCount("ackType=Renew", 5);
     }
 
     @ClusterTest
@@ -3005,6 +3015,119 @@ public class ShareConsumerTest {
                 shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
             }
         }
+        verifyYammerMetricCount("ackType=Renew", 5);
+    }
+
+    @ClusterTest
+    public void testRenewAcknowledgementInvalidStateRecord() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) ->
+                offsetsByTopicPartition.forEach((tip, offsets) -> 
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message 
".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+
+            for (ConsumerRecord<byte[], byte[]> rec : records) {
+                shareConsumer.acknowledge(rec, AcknowledgeType.REJECT);
+                shareConsumer.commitSync();
+                assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(rec, AcknowledgeType.RENEW));
+            }
+        }
+        verifyYammerMetricCount("ackType=Renew", 0);
+    }
+
+    @ClusterTest(
+        brokers = 1,
+        serverProperties = {
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "12000"),
+            @ClusterConfigProperty(key = 
"group.share.min.record.lock.duration.ms", value = "12000"),
+        }
+    )
+    public void testRenewAcknowledgementNoResultInPoll() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) ->
+                offsetsByTopicPartition.forEach((tip, offsets) -> 
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+            for (int i = 0; i < 10; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message 
" + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            assertEquals(10, records.count());
+
+            int count = 0;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                if (count % 2 == 0) {
+                    shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+                } else {
+                    shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+                }
+                count++;
+            }
+
+            // 5 more records (total 15 produced).
+            for (int i = 10; i < 15; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message 
" + i).getBytes());
+                producer.send(record);
+            }
+
+            // Get the rest of all 5 records.
+            records = waitedPoll(shareConsumer, 11500L, 0);  // This will send 
the acks but not return next 5 records (10-15)
+            assertEquals(10, acknowledgementsCommitted.get());
+            assertEquals(0, records.count());
+            verifyYammerMetricCount("ackType=Renew", 5);
+
+            // Renewal duration passed, now records will be back.
+            records = waitedPoll(shareConsumer, 2500L, 5);  // Renewed records 
as well as 10-15 records.
+            assertEquals(5, records.count());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+            }
+
+            shareConsumer.commitSync();
+
+            records = waitedPoll(shareConsumer, 2500L, 5);  // 10-15 records.
+            assertEquals(5, records.count());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+            }
+
+            shareConsumer.commitSync();
+
+            // Initial - 5 renew + 5 accept, Subsequent - 5 renewed accepted + 
5 fresh accepted (10-15)
+            assertEquals(20, acknowledgementsCommitted.get());
+        }
+        verifyYammerMetricCount("ackType=Renew", 5);
+    }
+
+    private void verifyYammerMetricCount(String filterString, int count) {
+        com.yammer.metrics.core.Metric renewAck = 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+            .filter(entry -> entry.getKey().toString().contains(filterString))
+            .findAny()
+            .orElseThrow(() -> new AssertionError("metric not found"))
+            .getValue();
+
+        assertEquals(count, ((Meter) renewAck).count());
     }
 
     /**

Reply via email to