This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e9ae03b9ca KAFKA-20034: set deliveryCompleteCount to 0 when share 
partition is initialized with non negative start offset. (#21246)
3e9ae03b9ca is described below

commit 3e9ae03b9ca7d1465218912b9067f464adee4c81
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Tue Jan 6 14:13:54 2026 +0530

    KAFKA-20034: set deliveryCompleteCount to 0 when share partition is 
initialized with non negative start offset. (#21246)
    
    Currently, when the start offset for a share partition is altered, the
    deliveryCompleteCount is set as -1. This results in a hyphenated lag,
    until the consumption begins, leading to some write states sent to the
    persister. But, when the start offset is altered to some non zero value,
    the system should be able to calculate the lag, irrespective of whether
    the consumption has begun or not from the new place. This PR resolves
    this by setting the deliveryCompleteCount to 0, whenever the start
    offset is changed to a non negative value.
    
    Test Results ->  <img width="1726" height="928" alt="image"
    
    
src="https://github.com/user-attachments/assets/35dc8b43-4590-4d80-868e-1764d7ceb2f8";
    />
    
    Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
    <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 123 +++++++++++++++++++++
 .../server/share/persister/PartitionFactory.java   |   7 +-
 .../kafka/coordinator/share/ShareGroupOffset.java  |  11 +-
 3 files changed, 137 insertions(+), 4 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 85fa049636e..bd62d889083 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -21,8 +21,10 @@ import kafka.server.KafkaBroker;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
@@ -3411,6 +3413,97 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testSharePartitionLagAfterAlterShareGroupOffsets() {
+        String groupId = "group1";
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 100 records to the topic partition.
+            for (int i = 0; i < 100; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+
+            // Create a new share consumer. Since the share.auto.offset.reset 
is not altered, it should be latest by default.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
subscribes to the topic.
+            waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new 
TopicPartition(tp.topic(), 0)));
+            // Producing 5 additional records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Polling share consumer to make sure the records are consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be altered.
+            shareConsumer.close();
+            // Alter the start offset of the share partition to 40.
+            alterShareGroupOffsets(adminClient, groupId, tp, 40L);
+            // After altering, the share partition start offset should be 40.
+            verifySharePartitionStartOffset(adminClient, groupId, tp, 40L);
+            // Verify that the lag is now 65 since the start offset is altered 
to 40 and there are total 105 records in the partition.
+            verifySharePartitionLag(adminClient, groupId, tp, 65L);
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Test failed with exception: " + e.getMessage());
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 5 records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Create a new share consumer.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
consumes the produced records.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the records first to move the offset forward and 
register the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be deleted.
+            shareConsumer.close();
+            // Delete the share group offsets.
+            deleteShareGroupOffsets(adminClient, groupId, tp.topic());
+            // Verify that the share partition offsets are deleted.
+            verifySharePartitionOffsetsDeleted(adminClient, groupId, tp);
+            // Create a new share consumer.
+            ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer2.subscribe(List.of(tp.topic()));
+            // Since the offsets are deleted, the share consumer should 
consume from the beginning (share.auto.offset.reset is earliest).
+            // Thus, the consumer should consume all 5 records again.
+            records = waitedPoll(shareConsumer2, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the records first to move the offset forward and 
register the state with persister.
+            records.forEach(r -> shareConsumer2.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer2.commitSync();
+            // After accepting, the lag should be 0 because the records are 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be deleted.
+            shareConsumer2.close();
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Test failed with exception: " + e.getMessage());
+        }
+    }
+
     @ClusterTest
     public void testFetchWithThrottledDelivery() {
         alterShareAutoOffsetReset("group1", "earliest");
@@ -4095,6 +4188,14 @@ public class ShareConsumerTest {
         return partitionResult;
     }
 
+    private void verifySharePartitionStartOffset(Admin adminClient, String 
groupId, TopicPartition tp, long expectedStartOffset) throws 
InterruptedException {
+        TestUtils.waitForCondition(() -> {
+            SharePartitionOffsetInfo sharePartitionOffsetInfo = 
sharePartitionOffsetInfo(adminClient, groupId, tp);
+            return sharePartitionOffsetInfo != null &&
+                sharePartitionOffsetInfo.startOffset() == expectedStartOffset;
+        }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to 
retrieve share partition lag");
+    }
+
     private void verifySharePartitionLag(Admin adminClient, String groupId, 
TopicPartition tp, long expectedLag) throws InterruptedException {
         TestUtils.waitForCondition(() -> {
             SharePartitionOffsetInfo sharePartitionOffsetInfo = 
sharePartitionOffsetInfo(adminClient, groupId, tp);
@@ -4104,6 +4205,28 @@ public class ShareConsumerTest {
         }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to 
retrieve share partition lag");
     }
 
+    private void verifySharePartitionOffsetsDeleted(Admin adminClient, String 
groupId, TopicPartition tp) throws InterruptedException {
+        TestUtils.waitForCondition(
+            () -> sharePartitionOffsetInfo(adminClient, groupId, tp) == null, 
+            DEFAULT_MAX_WAIT_MS, 
+            DEFAULT_POLL_INTERVAL_MS, 
+            () -> "Failed to retrieve share partition lag");
+    }
+
+    private void alterShareGroupOffsets(Admin adminClient, String groupId, 
TopicPartition topicPartition, Long newOffset) throws InterruptedException, 
ExecutionException {
+        adminClient.alterShareGroupOffsets(
+            groupId,
+            Map.of(topicPartition, newOffset),
+            new 
AlterShareGroupOffsetsOptions().timeoutMs(30000)).partitionResult(topicPartition).get();
+    }
+
+    private void deleteShareGroupOffsets(Admin adminClient, String groupId, 
String topic) throws InterruptedException, ExecutionException {
+        adminClient.deleteShareGroupOffsets(
+            groupId,
+            Set.of(topic),
+            new 
DeleteShareGroupOffsetsOptions().timeoutMs(30000)).topicResult(topic).get();
+    }
+
     private void alterShareRecordLockDurationMs(String groupId, int newValue) {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
         Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index 215e95ff085..b36c483a53f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -42,7 +42,12 @@ public class PartitionFactory {
     }
 
     public static PartitionStateData newPartitionStateData(int partition, int 
stateEpoch, long startOffset) {
-        return new PartitionData(partition, stateEpoch, startOffset, 
UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, 
DEFAULT_LEADER_EPOCH, null);
+        // If the start offset is uninitialized (when the share partition is 
being initialized for the first time), the 
+        // consumption hasn't started yet, and lag cannot be calculated. Thus, 
deliveryCompleteCount is also set as -1. 
+        // But, if start offset is a non-negative value (when the start offset 
is altered), the lag can be calculated 
+        // from that point onward. Hence, we set deliveryCompleteCount to 0 in 
that case.
+        int deliveryCompleteCount = startOffset == UNINITIALIZED_START_OFFSET 
? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
+        return new PartitionData(partition, stateEpoch, startOffset, 
deliveryCompleteCount, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, 
DEFAULT_LEADER_EPOCH, null);
     }
 
     public static PartitionErrorData newPartitionErrorData(int partition, 
short errorCode, String errorMessage) {
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index 9f1a2644257..ac17045733d 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -35,6 +35,7 @@ public class ShareGroupOffset {
     public static final int NO_TIMESTAMP = 0;
     public static final int UNINITIALIZED_EPOCH = 0;
     public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
+    public static final int UNINITIALIZED_START_OFFSET = -1;
     public static final int DEFAULT_EPOCH = 0;
 
     private final int snapshotEpoch;
@@ -160,14 +161,18 @@ public class ShareGroupOffset {
     }
 
     public static ShareGroupOffset 
fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch, long timestamp) {
-        // This method is invoked during InitializeShareGroupStateRequest. 
Since the deliveryCompleteCount is not yet
-        // known at this stage, it is initialized to its default value.
+        // This method is invoked during InitializeShareGroupStateRequest. If 
the start offset is uninitialized (when the 
+        // share partition is being initialized for the first time), the 
consumption hasn't started yet, and lag cannot
+        // be calculated. Thus, deliveryCompleteCount is also set as -1. But, 
if start offset is a non-negative value (when 
+        // the start offset is altered), the lag can be calculated from that 
point onward. Hence, we set deliveryCompleteCount
+        // to 0 in that case.
+        int deliveryCompleteCount = data.startOffset() == 
UNINITIALIZED_START_OFFSET ? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
         return new ShareGroupOffset(
             snapshotEpoch,
             data.stateEpoch(),
             UNINITIALIZED_EPOCH,
             data.startOffset(),
-            UNINITIALIZED_DELIVERY_COMPLETE_COUNT,
+            deliveryCompleteCount,
             List.of(),
             timestamp,
             timestamp

Reply via email to