This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 214bb07d6a4 KAFKA-19845: [1/N] Renew acks in share consumer (#20838)
214bb07d6a4 is described below

commit 214bb07d6a47f67b37b7c3aed86ebebe7f7ac992
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Nov 11 20:21:51 2025 +0000

    KAFKA-19845: [1/N] Renew acks in share consumer (#20838)
    
    Implements AcknowledgeType.RENEW in the share consumer as part of
    KIP-1222. There will be a future PR with additional tests.
    
    Reviewers: Apoorv Mittal <[email protected]>, Shivsundar R
     <[email protected]>, Abhinav Dixit <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  |  92 ++++++++++++++++-
 .../consumer/internals/Acknowledgements.java       |   9 ++
 .../internals/ShareConsumeRequestManager.java      | 114 +++++++++++++--------
 .../consumer/internals/ShareConsumerImpl.java      |  70 ++++++++++---
 .../clients/consumer/internals/ShareFetch.java     |  65 +++++++++++-
 .../consumer/internals/ShareInFlightBatch.java     |  90 ++++++++++++++--
 .../consumer/internals/ShareSessionHandler.java    |  50 +++++++--
 .../consumer/internals/events/BackgroundEvent.java |   1 +
 .../ShareRenewAcknowledgementsCompleteEvent.java   |  42 ++++++++
 .../common/requests/ShareAcknowledgeRequest.java   |   3 +-
 .../kafka/common/requests/ShareFetchRequest.java   |   6 +-
 .../common/message/ShareFetchRequest.json          |   2 +-
 .../consumer/internals/AcknowledgementsTest.java   |  25 ++++-
 .../internals/ShareCompletedFetchTest.java         |  98 +++++++++---------
 .../internals/ShareConsumeRequestManagerTest.java  |  87 ++++++++++++++--
 .../consumer/internals/ShareConsumerImplTest.java  |  87 ++++++++++++++++
 .../internals/ShareFetchCollectorTest.java         |  68 ++++++++++++
 .../kafka/server/share/SharePartitionManager.java  |   4 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../server/ShareFetchAcknowledgeRequestTest.scala  |  11 +-
 20 files changed, 772 insertions(+), 154 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 2f7e72fce95..4cc6c195eac 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -133,6 +133,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class ShareConsumerTest {
     private final ClusterInstance cluster;
     private final TopicPartition tp = new TopicPartition("topic", 0);
+    private Uuid tpId;
     private final TopicPartition tp2 = new TopicPartition("topic2", 0);
     private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
     private List<TopicPartition> sgsTopicPartitions;
@@ -151,7 +152,7 @@ public class ShareConsumerTest {
     public void setup() {
         try {
             this.cluster.waitForReadyBrokers();
-            createTopic("topic");
+            tpId = createTopic("topic");
             createTopic("topic2");
             sgsTopicPartitions = IntStream.range(0, 3)
                 .mapToObj(part -> new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
@@ -2906,6 +2907,95 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testRenewAcknowledgementOnPoll() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) ->
+                offsetsByTopicPartition.forEach((tip, offsets) -> 
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+            for (int i = 0; i < 10; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message 
" + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            assertEquals(10, records.count());
+
+            int count = 0;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                if (count % 2 == 0) {
+                    shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+                } else {
+                    shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+                }
+                count++;
+            }
+
+            // Get the rest of all 5 records.
+            records = waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+            }
+
+            shareConsumer.commitSync();
+            assertEquals(15, acknowledgementsCommitted.get());
+        }
+    }
+
+    @ClusterTest
+    public void testRenewAcknowledgementOnCommitSync() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) ->
+                offsetsByTopicPartition.forEach((tip, offsets) -> 
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+            for (int i = 0; i < 10; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message 
" + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            assertEquals(10, records.count());
+
+            int count = 0;
+            Map<TopicIdPartition, Optional<KafkaException>> result;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                if (count % 2 == 0) {
+                    shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+                } else {
+                    shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+                }
+                result = shareConsumer.commitSync();
+                assertEquals(1, result.size());
+                assertEquals(Optional.empty(), result.get(new 
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+                count++;
+            }
+
+            // Get the rest of all 5 records.
+            records = waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+            }
+        }
+    }
+
     /**
      * Util class to encapsulate state for a consumer/producer
      * being executed by an {@link ExecutorService}.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
index 5bce77651b9..a60d24520d0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
@@ -143,6 +143,15 @@ public class Acknowledgements {
         return acknowledgeException;
     }
 
+    /**
+     * Whether an acknowledgement error code was received in the response from 
the broker.
+     *
+     * @return Whether an acknowledgement error code was received in the 
response from the broker.
+     */
+    public boolean isCompletedExceptionally() {
+        return acknowledgeException != null;
+    }
+
     /**
      * Merges two sets of acknowledgements. If there are overlapping 
acknowledgements, the
      * merged set wins.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 5b5214c834a..eff925d342e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -22,6 +22,7 @@ import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollRes
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -200,7 +201,6 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             }
         }
 
-
         // Iterate over the session handlers to see if there are 
acknowledgements to be sent for partitions
         // which are no longer part of the current subscription.
         // We fail acknowledgements for records fetched from a previous leader.
@@ -228,6 +228,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             } else {
                                 log.debug("Leader for the partition is down or 
has changed, failing Acknowledgements for partition {}", tip);
                                 
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                                
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
                                 
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
                             }
                         });
@@ -262,6 +263,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     }
 
     /**
+     * Add acknowledgements for a topic-partition to the node's in-flight 
acknowledgements.
      *
      * @return True if we can add acknowledgements to the share session.
      * If we cannot add acknowledgements, they are completed with {@link 
Errors#INVALID_SHARE_SESSION_EPOCH} exception.
@@ -274,6 +276,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             // Failing the acknowledgements as we cannot have piggybacked 
acknowledgements in the initial ShareFetchRequest.
             log.debug("Cannot send acknowledgements on initial epoch for 
ShareSession for partition {}", tip);
             
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
+            sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
acknowledgements));
             maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
acknowledgements));
             return false;
         } else {
@@ -389,6 +392,11 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         }
     }
 
+    private void 
sendShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap) {
+        ShareRenewAcknowledgementsCompleteEvent event = new 
ShareRenewAcknowledgementsCompleteEvent(acknowledgementsMap);
+        backgroundEventHandler.add(event);
+    }
+
     /**
      *
      * @param acknowledgeRequestState Contains the acknowledgements to be sent.
@@ -536,6 +544,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             resultCount.incrementAndGet();
                         } else {
                             
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                            
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
                             
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
                         }
                     }
@@ -612,6 +621,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             }
                         } else {
                             
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                            
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
                             
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
                         }
                     }
@@ -650,6 +660,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
             } else {
                 
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
nodeAcks.acknowledgements()));
                 maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
nodeAcks.acknowledgements()));
             }
         });
@@ -669,6 +680,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             }
                         } else {
                             
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                            
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
                             
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
                         }
                     });
@@ -695,7 +707,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             new IllegalStateException("Attempt to call close() 
when there is an existing close request for node : " + node.id()));
                 } else {
                     // There can only be one close() happening at a time. So 
per node, there will be one acknowledge request state.
-                    acknowledgeRequestStates.get(nodeId).setCloseRequest(new 
AcknowledgeRequestState(logContext,
+                    acknowledgeRequestStates.get(nodeId).setCloseRequest(
+                        new AcknowledgeRequestState(logContext,
                             ShareConsumeRequestManager.class.getSimpleName() + 
":3",
                             deadlineMs,
                             retryBackoffMs,
@@ -737,7 +750,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     }
 
     private void handleShareFetchSuccess(Node fetchTarget,
-                                         @SuppressWarnings("unused") 
ShareFetchRequestData requestData,
+                                         ShareFetchRequestData requestData,
                                          ClientResponse resp) {
         try {
             log.debug("Completed ShareFetch request from node {} 
successfully", fetchTarget.id());
@@ -756,13 +769,16 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 if (response.error() == Errors.UNKNOWN_TOPIC_ID) {
                     metadata.requestUpdate(false);
                 }
-                // Complete any inFlight acknowledgements with the error code 
from the response.
+                // Complete any in-flight acknowledgements with the error code 
from the response.
                 Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsInFlight = 
fetchAcknowledgementsInFlight.get(fetchTarget.id());
                 if (nodeAcknowledgementsInFlight != null) {
                     nodeAcknowledgementsInFlight.forEach((tip, acks) -> {
                         
acks.complete(Errors.forCode(response.error().code()).exception());
                         
metricsManager.recordFailedAcknowledgements(acks.size());
                     });
+                    if (requestData.isRenewAck()) {
+                        
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
+                    }
                     
maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight);
                     nodeAcknowledgementsInFlight.clear();
                 }
@@ -772,12 +788,12 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
 
             response.data().responses().forEach(topicResponse ->
-                    topicResponse.partitions().forEach(partition -> {
-                        TopicIdPartition tip = 
lookupTopicId(topicResponse.topicId(), partition.partitionIndex());
-                        if (tip != null) {
-                            responseData.put(tip, partition);
-                        }
-                    })
+                topicResponse.partitions().forEach(partition -> {
+                    TopicIdPartition tip = 
lookupTopicId(topicResponse.topicId(), partition.partitionIndex());
+                    if (tip != null) {
+                        responseData.put(tip, partition);
+                    }
+                })
             );
 
             final Set<TopicPartition> partitions = 
responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
@@ -802,6 +818,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         
acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode())
                                 
.exception(partitionData.acknowledgeErrorMessage()));
                         Map<TopicIdPartition, Acknowledgements> acksMap = 
Map.of(tip, acks);
+                        if (requestData.isRenewAck()) {
+                            
sendShareRenewAcknowledgementsCompleteEvent(acksMap);
+                        }
                         maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
                     }
                 }
@@ -839,6 +858,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
                 
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, 
acknowledgements) -> {
                     acknowledgements.complete(new 
InvalidRecordStateException(INVALID_RESPONSE));
+                    
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition, 
acknowledgements));
                     
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, 
acknowledgements));
                 });
             }
@@ -886,6 +906,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             
acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception());
                         }
                         Map<TopicIdPartition, Acknowledgements> acksMap = 
Map.of(tip, acks);
+                        if (requestData.isRenewAck()) {
+                            
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
+                        }
                         maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
                     }
                 }
@@ -917,7 +940,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     if (partitionData.errorCode() != Errors.NONE.code()) {
                         
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                     }
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partitionData.errorCode()));
+                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partitionData.errorCode()), requestData.isRenewAck());
                 }));
 
                 
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
@@ -944,7 +967,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             return;
                         }
 
-                        handlePartitionError(partitionData, 
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, partitionError, tip, 
shouldRetry);
+                        handlePartitionError(partitionData, 
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState,
+                            partitionError, tip, shouldRetry, 
requestData.isRenewAck());
                     }));
 
                     processRetryLogic(acknowledgeRequestState, shouldRetry, 
responseCompletionTimeMs);
@@ -990,7 +1014,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
 
                 
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
-                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error));
+                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error), requestData.isRenewAck());
             }));
 
             acknowledgeRequestState.processingComplete();
@@ -1010,7 +1034,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                       AcknowledgeRequestState 
acknowledgeRequestState,
                                       Errors partitionError,
                                       TopicIdPartition tip,
-                                      AtomicBoolean shouldRetry) {
+                                      AtomicBoolean shouldRetry,
+                                      boolean isRenewAck) {
         if (partitionError.exception() != null) {
             boolean retry = false;
             if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH || partitionError == 
Errors.UNKNOWN_TOPIC_OR_PARTITION) {
@@ -1029,10 +1054,10 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
             } else {
                 
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
-                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError);
+                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError, isRenewAck);
             }
         } else {
-            acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError);
+            acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError, isRenewAck);
         }
     }
 
@@ -1058,7 +1083,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
         log.debug("For {}, received error {}, with leaderIdAndEpoch {} in 
ShareAcknowledge", tp, partitionError, partitionData.currentLeader());
         if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
-            partitionsWithUpdatedLeaderInfo.put(tp, new 
Metadata.LeaderIdAndEpoch(
+            partitionsWithUpdatedLeaderInfo.put(tp,
+                new Metadata.LeaderIdAndEpoch(
                     Optional.of(partitionData.currentLeader().leaderId()),
                     Optional.of(partitionData.currentLeader().leaderEpoch())
             ));
@@ -1186,7 +1212,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             }
 
             Map<TopicIdPartition, Acknowledgements> 
finalAcknowledgementsToSend = new HashMap<>(
-                    incompleteAcknowledgements.isEmpty() ? 
acknowledgementsToSend : incompleteAcknowledgements);
+                incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend 
: incompleteAcknowledgements);
 
             for (Map.Entry<TopicIdPartition, Acknowledgements> entry : 
finalAcknowledgementsToSend.entrySet()) {
                 sessionHandler.addPartitionToFetch(entry.getKey(), 
entry.getValue());
@@ -1198,7 +1224,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             Node nodeToSend = metadata.fetch().nodeById(nodeId);
 
             if (requestBuilder == null) {
-                handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
+                handleAcknowledgeShareSessionNotFound();
                 return null;
             } else if (nodeToSend != null) {
                 nodesWithPendingRequests.add(nodeId);
@@ -1255,8 +1281,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
         boolean isEmpty() {
             return acknowledgementsToSend.isEmpty() &&
-                    incompleteAcknowledgements.isEmpty() &&
-                    inFlightAcknowledgements.isEmpty();
+                incompleteAcknowledgements.isEmpty() &&
+                inFlightAcknowledgements.isEmpty();
         }
 
         /**
@@ -1274,11 +1300,11 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * Sets the error code in the acknowledgements and sends the response
          * through a background event.
          */
-        void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode) {
+        void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode, boolean isRenewAck) {
             Acknowledgements acks = inFlightAcknowledgements.remove(tip);
             if (acks != null) {
                 acks.complete(acknowledgeErrorCode.exception());
-                resultHandler.complete(tip, acks, requestType);
+                resultHandler.complete(tip, acks, requestType, isRenewAck);
             } else {
                 log.error("Invalid partition {} received in ShareAcknowledge 
response", tip);
             }
@@ -1292,24 +1318,28 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             Acknowledgements acks = incompleteAcknowledgements.get(tip);
             if (acks != null) {
                 acks.complete(Errors.REQUEST_TIMED_OUT.exception());
-                resultHandler.complete(tip, acks, requestType);
+                // We do not know whether this is a renew ack, but handling 
the error as if it were, will ensure
+                // that we do not leave dangling acknowledgements
+                resultHandler.complete(tip, acks, requestType, true);
             }
         }
 
         /**
          * Set the error code for all remaining acknowledgements in the event
-         * of a session error which prevents the remaining acknowledgements 
from
+         * of a share session not found error which prevents the remaining 
acknowledgements from
          * being sent.
          */
-        void handleSessionErrorCode(Errors errorCode) {
+        void handleAcknowledgeShareSessionNotFound() {
             Map<TopicIdPartition, Acknowledgements> acknowledgementsMapToClear 
=
-                    incompleteAcknowledgements.isEmpty() ? 
acknowledgementsToSend : incompleteAcknowledgements;
+                incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend 
: incompleteAcknowledgements;
 
             acknowledgementsMapToClear.forEach((tip, acks) -> {
                 if (acks != null) {
-                    acks.complete(errorCode.exception());
+                    acks.complete(Errors.SHARE_SESSION_NOT_FOUND.exception());
                 }
-                resultHandler.complete(tip, acks, requestType);
+                // We do not know whether this is a renew ack, but handling 
the error as if it were, will ensure
+                // that we do not leave dangling acknowledgements
+                resultHandler.complete(tip, acks, requestType, true);
             });
             acknowledgementsMapToClear.clear();
             processingComplete();
@@ -1335,7 +1365,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             if (!inFlightAcknowledgements.isEmpty()) {
                 inFlightAcknowledgements.forEach((partition, acknowledgements) 
-> {
                     acknowledgements.complete(exception);
-                    resultHandler.complete(partition, acknowledgements, 
requestType);
+                    // We do not know whether this is a renew ack, but 
handling the error as if it were, will ensure
+                    // that we do not leave dangling acknowledgements
+                    resultHandler.complete(partition, acknowledgements, 
requestType, true);
                 });
                 inFlightAcknowledgements.clear();
             }
@@ -1404,9 +1436,12 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * Handle the result of a ShareAcknowledge request sent to one or more 
nodes and
          * signal the completion when all results are known.
          */
-        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type) {
+        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck) {
             if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
                 if (acknowledgements != null) {
+                    if (isRenewAck) {
+                        
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition, 
acknowledgements));
+                    }
                     
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, 
acknowledgements));
                 }
             } else {
@@ -1414,6 +1449,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     result.put(partition, acknowledgements);
                 }
                 if (remainingResults != null && 
remainingResults.decrementAndGet() == 0) {
+                    if (isRenewAck) {
+                        sendShareRenewAcknowledgementsCompleteEvent(result);
+                    }
                     maybeSendShareAcknowledgeCommitCallbackEvent(result);
                     future.ifPresent(future -> future.complete(result));
                 }
@@ -1486,14 +1524,6 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             this.partitionIndex = partitionIndex;
         }
 
-        int getPartitionIndex() {
-            return partitionIndex;
-        }
-
-        Uuid getTopicId() {
-            return topicId;
-        }
-
         @Override
         public int hashCode() {
             return Objects.hash(topicId, partitionIndex);
@@ -1505,7 +1535,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             if (o == null || getClass() != o.getClass()) return false;
             IdAndPartition that = (IdAndPartition) o;
             return Objects.equals(topicId, that.topicId) &&
-                    partitionIndex == that.partitionIndex;
+                partitionIndex == that.partitionIndex;
         }
     }
 
@@ -1525,8 +1555,4 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             return super.toString().toLowerCase(Locale.ROOT);
         }
     }
-
-    Map<TopicIdPartition, Acknowledgements> 
getFetchAcknowledgementsToSend(Integer nodeId) {
-        return fetchAcknowledgementsToSend.get(nodeId);
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index af925b316da..a2274b8e9cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -45,6 +45,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCo
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
 import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -146,6 +147,10 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                     process((ShareAcknowledgementCommitCallbackEvent) event);
                     break;
 
+                case SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE:
+                    process((ShareRenewAcknowledgementsCompleteEvent) event);
+                    break;
+
                 default:
                     throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
             }
@@ -160,6 +165,10 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 completedAcknowledgements.add(event.acknowledgementsMap());
             }
         }
+
+        private void process(final ShareRenewAcknowledgementsCompleteEvent 
event) {
+            currentFetch.renew(event.acknowledgementsMap());
+        }
     }
 
     private final ApplicationEventHandler applicationEventHandler;
@@ -576,6 +585,9 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             // If using implicit acknowledgement, acknowledge the previously 
fetched records
             acknowledgeBatchIfImplicitAcknowledgement();
 
+            // If using explicit acknowledgement, make sure all in-flight 
records have been acknowledged
+            ensureInFlightAcknowledgedIfExplicitAcknowledgement();
+
             kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
             if (subscriptions.hasNoSubscriptionOrUserAssignment()) {
@@ -654,39 +666,55 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     }
 
     private ShareFetch<K, V> collect(Map<TopicIdPartition, 
NodeAcknowledgements> acknowledgementsMap) {
-        if (currentFetch.isEmpty()) {
+        Map<TopicIdPartition, NodeAcknowledgements> acksToSend = 
acknowledgementsMap;
+
+        if (currentFetch.isEmpty() && !currentFetch.hasRenewals()) {
             final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
             if (fetch.isEmpty()) {
+                // Check for any acknowledgements which could have come from 
control records (GAP) and send them.
                 Map<TopicIdPartition, NodeAcknowledgements> 
controlRecordAcknowledgements = fetch.takeAcknowledgedRecords();
-
                 if (!controlRecordAcknowledgements.isEmpty()) {
                     // Asynchronously commit any waiting acknowledgements from 
control records.
                     
sendShareAcknowledgeAsyncEvent(controlRecordAcknowledgements);
                 }
+
                 // We only send one ShareFetchEvent per poll call.
                 if (shouldSendShareFetchEvent) {
-                    // Check for any acknowledgements which could have come 
from control records (GAP) and include them.
-                    applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+                    applicationEventHandler.add(new 
ShareFetchEvent(acksToSend));
                     shouldSendShareFetchEvent = false;
                     // Notify the network thread to wake up and start the next 
round of fetching
                     applicationEventHandler.wakeupNetworkThread();
+                    acksToSend = Map.of();
                 }
-            } else if (!acknowledgementsMap.isEmpty()) {
-                // Asynchronously commit any waiting acknowledgements
-                sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
             }
-            return fetch;
-        } else {
-            if (!acknowledgementsMap.isEmpty()) {
+
+            if (!acksToSend.isEmpty()) {
                 // Asynchronously commit any waiting acknowledgements
-                sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
+                sendShareAcknowledgeAsyncEvent(acksToSend);
             }
-            if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
-                // We cannot leave unacknowledged records in EXPLICIT 
acknowledgement mode, so we throw an exception to the application.
-                throw new IllegalStateException("All records must be 
acknowledged in explicit acknowledgement mode.");
+            return fetch;
+        } else if (currentFetch.hasRenewals()) {
+            // First, take any records which have been renewed and move them 
back into in-flight records.
+            currentFetch.takeRenewedRecords();
+
+            // If some records are in renewing state...
+            if (currentFetch.hasRenewals()) {
+                // We only send one ShareFetchEvent per poll call.
+                if (shouldSendShareFetchEvent) {
+                    applicationEventHandler.add(new 
ShareFetchEvent(acksToSend));
+                    shouldSendShareFetchEvent = false;
+                    // Notify the network thread to wake up and start the next 
round of fetching
+                    applicationEventHandler.wakeupNetworkThread();
+                    acksToSend = Map.of();
+                }
             }
-            return currentFetch;
         }
+
+        if (!acksToSend.isEmpty()) {
+            // Asynchronously commit any waiting acknowledgements
+            sendShareAcknowledgeAsyncEvent(acksToSend);
+        }
+        return currentFetch;
     }
 
     private void sendShareAcknowledgeAsyncEvent(Map<TopicIdPartition, 
NodeAcknowledgements> acknowledgementsMap) {
@@ -1107,6 +1135,18 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         }
     }
 
+    /**
+     * If the acknowledgement mode is EXPLICIT, ensure that all in-flight 
records have been acknowledged.
+     */
+    private void ensureInFlightAcknowledgedIfExplicitAcknowledgement() {
+        if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
+            if (!currentFetch.checkAllInFlightAreAcknowledged()) {
+                // We cannot leave unacknowledged records in EXPLICIT 
acknowledgement mode, so we throw an exception to the application.
+                throw new IllegalStateException("All records must be 
acknowledged in explicit acknowledgement mode.");
+            }
+        }
+    }
+
     /**
      * Returns any ready acknowledgements to be sent to the cluster.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index 406110fe502..6eb3ba7d77d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -89,7 +89,9 @@ public class ShareFetch<K, V> {
                 Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry = 
iterator.next();
                 ShareInFlightBatch<K, V> batch = entry.getValue();
                 if (batch.isEmpty()) {
-                    iterator.remove();
+                    if (!batch.hasRenewals()) {
+                        iterator.remove();
+                    }
                 } else {
                     numRecords += batch.numRecords();
                 }
@@ -106,6 +108,29 @@ public class ShareFetch<K, V> {
         return numRecords() == 0;
     }
 
+    /**
+     * @return {@code true} if this fetch contains records being renewed
+     */
+    public boolean hasRenewals() {
+        boolean hasRenewals = false;
+        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : 
batches.entrySet()) {
+            if (entry.getValue().hasRenewals()) {
+                hasRenewals = true;
+                break;
+            }
+        }
+        return hasRenewals;
+    }
+
+    /**
+     * Take any renewed records and move them back into in-flight state.
+     */
+    public void takeRenewedRecords() {
+        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : 
batches.entrySet()) {
+            entry.getValue().takeRenewals();
+        }
+    }
+
     /**
      * Acknowledge a single record in the current batch.
      *
@@ -124,7 +149,9 @@ public class ShareFetch<K, V> {
     }
 
     /**
-     * Acknowledge a single record by its topic, partition and offset in the 
current batch.
+     * Acknowledge a single record which experienced an exception during its 
delivery by its topic, partition
+     * and offset in the current batch. This method is specifically for 
overriding the default acknowledge
+     * type for records whose delivery failed.
      *
      * @param topic     The topic of the record to acknowledge
      * @param partition The partition of the record
@@ -156,6 +183,23 @@ public class ShareFetch<K, V> {
         batches.forEach((tip, batch) -> batch.acknowledgeAll(type));
     }
 
+    /**
+     * Checks whether all in-flight records have been acknowledged. This is 
required for explicit
+     * acknowledgement mode.
+     *
+     * @return Whether all in-flight records have been acknowledged
+     */
+    public boolean checkAllInFlightAreAcknowledged() {
+        boolean allInFlightAreAcknowledged = true;
+        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : 
batches.entrySet()) {
+            if (!entry.getValue().checkAllInFlightAreAcknowledged()) {
+                allInFlightAreAcknowledged = false;
+                break;
+            }
+        }
+        return allInFlightAreAcknowledged;
+    }
+
     /**
      * Removes all acknowledged records from the in-flight records and returns 
the map of acknowledgements
      * to send. If some records were not acknowledged, the in-flight records 
will not be empty after this
@@ -173,4 +217,21 @@ public class ShareFetch<K, V> {
         });
         return acknowledgementMap;
     }
+
+    /**
+     * Handles completed renew acknowledgements by returning successfully 
renewed records
+     * to the set of in-flight records.
+     *
+     * @param acknowledgementsMap Map from topic-partition to acknowledgements 
for
+     *                            completed renew acknowledgements
+     *
+     * @return The number of records renewed
+     */
+    public int renew(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
+        int recordsRenewed = 0;
+        for (Map.Entry<TopicIdPartition, Acknowledgements> entry : 
acknowledgementsMap.entrySet()) {
+            recordsRenewed += 
batches.get(entry.getKey()).renew(entry.getValue());
+        }
+        return recordsRenewed;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
index 0fa0499aa1f..34051fc4fdb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicIdPartition;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,43 +30,56 @@ import java.util.TreeSet;
 
 public class ShareInFlightBatch<K, V> {
     private final int nodeId;
-    final TopicIdPartition partition;
+    private final TopicIdPartition partition;
     private final Map<Long, ConsumerRecord<K, V>> inFlightRecords;
+    private Map<Long, ConsumerRecord<K, V>> renewingRecords;
+    private Map<Long, ConsumerRecord<K, V>> renewedRecords;
     private final Set<Long> acknowledgedRecords;
     private Acknowledgements acknowledgements;
     private ShareInFlightBatchException exception;
     private boolean hasCachedException = false;
+    private boolean checkForRenewAcknowledgements = false;
 
     public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
         this.nodeId = nodeId;
         this.partition = partition;
-        inFlightRecords = new TreeMap<>();
-        acknowledgedRecords = new TreeSet<>();
-        acknowledgements = Acknowledgements.empty();
+        this.inFlightRecords = new TreeMap<>();
+        this.acknowledgedRecords = new TreeSet<>();
+        this.acknowledgements = Acknowledgements.empty();
     }
 
-    public void addAcknowledgement(long offset, AcknowledgeType 
acknowledgeType) {
-        acknowledgements.add(offset, acknowledgeType);
+    public void addAcknowledgement(long offset, AcknowledgeType type) {
+        acknowledgements.add(offset, type);
+        if (type == AcknowledgeType.RENEW) {
+            checkForRenewAcknowledgements = true;
+        }
     }
 
     public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) 
{
         if (inFlightRecords.get(record.offset()) != null) {
             acknowledgements.add(record.offset(), type);
             acknowledgedRecords.add(record.offset());
+            if (type == AcknowledgeType.RENEW) {
+                checkForRenewAcknowledgements = true;
+            }
             return;
         }
         throw new IllegalStateException("The record cannot be acknowledged.");
     }
 
-    public int acknowledgeAll(AcknowledgeType type) {
-        int recordsAcknowledged = 0;
+    public void acknowledgeAll(AcknowledgeType type) {
         for (Map.Entry<Long, ConsumerRecord<K, V>> entry : 
inFlightRecords.entrySet()) {
             if (acknowledgements.addIfAbsent(entry.getKey(), type)) {
                 acknowledgedRecords.add(entry.getKey());
-                recordsAcknowledged++;
             }
         }
-        return recordsAcknowledged;
+        if (type == AcknowledgeType.RENEW) {
+            checkForRenewAcknowledgements = true;
+        }
+    }
+
+    public boolean checkAllInFlightAreAcknowledged() {
+        return inFlightRecords.size() == acknowledgedRecords.size();
     }
 
     public void addRecord(ConsumerRecord<K, V> record) {
@@ -78,6 +92,9 @@ public class ShareInFlightBatch<K, V> {
 
     public void merge(ShareInFlightBatch<K, V> other) {
         inFlightRecords.putAll(other.inFlightRecords);
+        if (other.checkForRenewAcknowledgements) {
+            checkForRenewAcknowledgements = true;
+        }
     }
 
     List<ConsumerRecord<K, V>> getInFlightRecords() {
@@ -93,6 +110,21 @@ public class ShareInFlightBatch<K, V> {
     }
 
     Acknowledgements takeAcknowledgedRecords() {
+        if (checkForRenewAcknowledgements) {
+            if (renewingRecords == null) {
+                renewingRecords = new HashMap<>();
+            }
+            if (renewedRecords == null) {
+                renewedRecords = new HashMap<>();
+            }
+            Map<Long, AcknowledgeType> ackTypeMap = 
acknowledgements.getAcknowledgementsTypeMap();
+            acknowledgedRecords.forEach(offset -> {
+                if (ackTypeMap.get(offset) == AcknowledgeType.RENEW) {
+                    renewingRecords.put(offset, inFlightRecords.get(offset));
+                }
+            });
+        }
+
         // Usually, all records will be acknowledged, so we can just clear the 
in-flight records leaving
         // an empty batch, which will trigger more fetching
         if (acknowledgedRecords.size() == inFlightRecords.size()) {
@@ -105,9 +137,47 @@ public class ShareInFlightBatch<K, V> {
 
         Acknowledgements currentAcknowledgements = acknowledgements;
         acknowledgements = Acknowledgements.empty();
+        checkForRenewAcknowledgements = false;
         return currentAcknowledgements;
     }
 
+    int renew(Acknowledgements acknowledgements) {
+        int recordsRenewed = 0;
+        boolean isCompletedExceptionally = 
acknowledgements.isCompletedExceptionally();
+        if (acknowledgements.isCompleted()) {
+            Map<Long, AcknowledgeType> ackTypeMap = 
acknowledgements.getAcknowledgementsTypeMap();
+            for (Map.Entry<Long, AcknowledgeType> ackTypeEntry : 
ackTypeMap.entrySet()) {
+                long offset = ackTypeEntry.getKey();
+                AcknowledgeType ackType = ackTypeEntry.getValue();
+                ConsumerRecord<K, V> record = renewingRecords.remove(offset);
+                if (ackType == AcknowledgeType.RENEW) {
+                    if (record != null && !isCompletedExceptionally) {
+                        // The record is moved into renewed state, and will 
then become in-flight later.
+                        renewedRecords.put(offset, record);
+                        recordsRenewed++;
+                    }
+                }
+            }
+        } else {
+            throw new IllegalStateException("Renewing with uncompleted 
acknowledgements");
+        }
+        return recordsRenewed;
+    }
+
+    boolean hasRenewals() {
+        if (renewingRecords == null) {
+            return false;
+        }
+        return !renewingRecords.isEmpty() || !renewedRecords.isEmpty();
+    }
+
+    void takeRenewals() {
+        if (renewedRecords != null) {
+            inFlightRecords.putAll(renewedRecords);
+            renewedRecords.clear();
+        }
+    }
+
     Acknowledgements getAcknowledgements() {
         return acknowledgements;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 6dc56ee26f1..348855a341b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
@@ -169,18 +170,39 @@ public class ShareSessionHandler {
         // The replaced topic-partitions need to be removed, and their 
replacements are already added
         removed.addAll(replaced);
 
+        boolean hasRenewAcknowledgements = false;
         Map<TopicIdPartition, 
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementBatches = new 
HashMap<>();
-        nextAcknowledgements.forEach((partition, acknowledgements) -> 
acknowledgementBatches.put(partition, 
acknowledgements.getAcknowledgementBatches()
-                .stream().map(AcknowledgementBatch::toShareFetchRequest)
-                .collect(Collectors.toList())));
+        if (!nextAcknowledgements.isEmpty()) {
+            for (Map.Entry<TopicIdPartition, Acknowledgements> partitionsAcks 
: nextAcknowledgements.entrySet()) {
+                List<AcknowledgementBatch> partitionAckBatches = 
partitionsAcks.getValue().getAcknowledgementBatches();
+                for (AcknowledgementBatch ackBatch : partitionAckBatches) {
+                    if 
(ackBatch.acknowledgeTypes().contains(AcknowledgeType.RENEW.id)) {
+                        hasRenewAcknowledgements = true;
+                    }
+                    
acknowledgementBatches.computeIfAbsent(partitionsAcks.getKey(), k -> new 
ArrayList<>()).add(ackBatch.toShareFetchRequest());
+                }
+            }
+        }
 
         nextPartitions = new LinkedHashMap<>();
         nextAcknowledgements = new LinkedHashMap<>();
 
-        return ShareFetchRequest.Builder.forConsumer(
+        if (hasRenewAcknowledgements) {
+            // If the request has renew acknowledgements, the ShareFetch is 
only used to send the acknowledgements
+            // and potentially update the share session. The parameters for 
wait time, number of bytes and number of
+            // records are all zero.
+            return ShareFetchRequest.Builder.forConsumer(
+                groupId, nextMetadata, 0,
+                0, 0, 0,
+                0, shareFetchConfig.shareAcquireMode.id, true,
+                added, removed, acknowledgementBatches);
+        } else {
+            return ShareFetchRequest.Builder.forConsumer(
                 groupId, nextMetadata, shareFetchConfig.maxWaitMs,
                 shareFetchConfig.minBytes, shareFetchConfig.maxBytes, 
shareFetchConfig.maxPollRecords,
-                shareFetchConfig.maxPollRecords, 
shareFetchConfig.shareAcquireMode.id, added, removed, acknowledgementBatches);
+                shareFetchConfig.maxPollRecords, 
shareFetchConfig.shareAcquireMode.id, false,
+                added, removed, acknowledgementBatches);
+        }
     }
 
     public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String 
groupId, ShareFetchConfig shareFetchConfig) {
@@ -191,15 +213,23 @@ public class ShareSessionHandler {
             return null;
         }
 
+        boolean hasRenewAcknowledgements = false;
         Map<TopicIdPartition, 
List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgementBatches 
= new HashMap<>();
-        nextAcknowledgements.forEach((partition, acknowledgements) ->
-                acknowledgementBatches.put(partition, 
acknowledgements.getAcknowledgementBatches()
-                        
.stream().map(AcknowledgementBatch::toShareAcknowledgeRequest)
-                        .collect(Collectors.toList())));
+        if (!nextAcknowledgements.isEmpty()) {
+            for (Map.Entry<TopicIdPartition, Acknowledgements> partitionsAcks 
: nextAcknowledgements.entrySet()) {
+                List<AcknowledgementBatch> partitionAckBatches = 
partitionsAcks.getValue().getAcknowledgementBatches();
+                for (AcknowledgementBatch ackBatch : partitionAckBatches) {
+                    if 
(ackBatch.acknowledgeTypes().contains(AcknowledgeType.RENEW.id)) {
+                        hasRenewAcknowledgements = true;
+                    }
+                    
acknowledgementBatches.computeIfAbsent(partitionsAcks.getKey(), k -> new 
ArrayList<>()).add(ackBatch.toShareAcknowledgeRequest());
+                }
+            }
+        }
 
         nextAcknowledgements = new LinkedHashMap<>();
 
-        return ShareAcknowledgeRequest.Builder.forConsumer(groupId, 
nextMetadata, acknowledgementBatches);
+        return ShareAcknowledgeRequest.Builder.forConsumer(groupId, 
nextMetadata, hasRenewAcknowledgements, acknowledgementBatches);
     }
 
     private String topicIdPartitionsToLogString(Collection<TopicIdPartition> 
partitions) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 6fa737c7278..9f84df9414d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -29,6 +29,7 @@ public abstract class BackgroundEvent {
         ERROR,
         CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
         SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK,
+        SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE,
         STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED,
         STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED,
         STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
new file mode 100644
index 00000000000..1151ac04a52
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.Acknowledgements;
+import org.apache.kafka.common.TopicIdPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ShareRenewAcknowledgementsCompleteEvent extends BackgroundEvent {
+
+    private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+
+    public ShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap) {
+        super(Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE);
+        this.acknowledgementsMap = new HashMap<>(acknowledgementsMap);
+    }
+
+    public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
+        return acknowledgementsMap;
+    }
+
+    @Override
+    public String toStringBase() {
+        return super.toStringBase() + ", acknowledgementsMap=" + 
acknowledgementsMap;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
index 43bec9c5fda..1f5bba66b8c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
@@ -38,7 +38,7 @@ public class ShareAcknowledgeRequest extends AbstractRequest {
             this.data = data;
         }
 
-        public static ShareAcknowledgeRequest.Builder forConsumer(String 
groupId, ShareRequestMetadata metadata,
+        public static ShareAcknowledgeRequest.Builder forConsumer(String 
groupId, ShareRequestMetadata metadata, boolean isRenewAck,
                                                                   
Map<TopicIdPartition, List<ShareAcknowledgeRequestData.AcknowledgementBatch>> 
acknowledgementsMap) {
             ShareAcknowledgeRequestData data = new 
ShareAcknowledgeRequestData();
             data.setGroupId(groupId);
@@ -46,6 +46,7 @@ public class ShareAcknowledgeRequest extends AbstractRequest {
                 data.setMemberId(metadata.memberId().toString());
                 data.setShareSessionEpoch(metadata.epoch());
             }
+            data.setIsRenewAck(isRenewAck);
 
             ShareAcknowledgeRequestData.AcknowledgeTopicCollection ackTopics = 
new ShareAcknowledgeRequestData.AcknowledgeTopicCollection();
             for (Map.Entry<TopicIdPartition, 
List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgeEntry : 
acknowledgementsMap.entrySet()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index ec044bca955..53f9373d35f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -45,7 +45,8 @@ public class ShareFetchRequest extends AbstractRequest {
 
         public static Builder forConsumer(String groupId, ShareRequestMetadata 
metadata,
                                           int maxWait, int minBytes, int 
maxBytes, int maxRecords,
-                                          int batchSize, byte 
shareAcquireMode, List<TopicIdPartition> send, List<TopicIdPartition> forget,
+                                          int batchSize, byte 
shareAcquireMode, boolean isRenewAck,
+                                          List<TopicIdPartition> send, 
List<TopicIdPartition> forget,
                                           Map<TopicIdPartition, 
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
             ShareFetchRequestData data = new ShareFetchRequestData();
             data.setGroupId(groupId);
@@ -63,6 +64,7 @@ public class ShareFetchRequest extends AbstractRequest {
             data.setMaxRecords(maxRecords);
             data.setBatchSize(batchSize);
             data.setShareAcquireMode(shareAcquireMode);
+            data.setIsRenewAck(isRenewAck);
 
             // Build a map of topics to fetch keyed by topic ID, and within 
each a map of partitions keyed by index
             ShareFetchRequestData.FetchTopicCollection fetchTopics = new 
ShareFetchRequestData.FetchTopicCollection();
@@ -147,7 +149,7 @@ public class ShareFetchRequest extends AbstractRequest {
                 }
                 // The v1 only supports ShareAcquireMode.BATCH_OPTIMIZED.
                 if (data.shareAcquireMode() != 
ShareAcquireMode.BATCH_OPTIMIZED.id()) {
-                    throw new UnsupportedVersionException("The v1 ShareFetch 
does not support ShareAcquireMode.RECORD_LIMIT");
+                    throw new UnsupportedVersionException("The v1 ShareFetch 
only supports ShareAcquireMode.BATCH_OPTIMIZED");
                 }
             }
             return new ShareFetchRequest(data, version);
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json 
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index afc08813711..1357858cc41 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -43,7 +43,7 @@
     { "name": "BatchSize", "type": "int32", "versions": "1+",
       "about": "The optimal number of records for batches of acquired records 
and acknowledgements." },
     { "name": "ShareAcquireMode", "type": "int8", "versions": "2+", "default": 
"0", "ignorable": true,
-      "about": "The acquire mode to control the fetch behavior: 0 - 
batch-optimized, 1 - record-limit" },
+      "about": "The acquire mode to control the fetch behavior - 
0:batch-optimized,1:record-limit." },
     { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default": 
"false",
       "about": "Whether Renew type acknowledgements present in 
AcknowledgementBatches." },
     { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
index b6818ab51b5..60836465003 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
@@ -17,13 +17,17 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.common.KafkaException;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class AcknowledgementsTest {
@@ -475,7 +479,6 @@ public class AcknowledgementsTest {
         assertEquals(AcknowledgeType.REJECT.id, 
ackList2.get(2).acknowledgeTypes().get(0));
     }
 
-
     @Test
     public void testNoncontiguousGaps() {
         acks.addGap(2L);
@@ -503,4 +506,24 @@ public class AcknowledgementsTest {
         assertEquals(1, ackList2.get(1).acknowledgeTypes().size());
         assertEquals(Acknowledgements.ACKNOWLEDGE_TYPE_GAP, 
ackList2.get(1).acknowledgeTypes().get(0));
     }
+
+    @Test
+    public void testCompleteSuccess() {
+        acks.add(0, AcknowledgeType.RENEW);
+        assertFalse(acks.isCompleted());
+
+        acks.complete(null);
+        assertTrue(acks.isCompleted());
+        assertNull(acks.getAcknowledgeException());
+    }
+
+    @Test
+    public void testCompleteException() {
+        acks.add(0, AcknowledgeType.RENEW);
+        assertFalse(acks.isCompleted());
+
+        acks.complete(new KafkaException());
+        assertTrue(acks.isCompleted());
+        assertNotNull(acks.getAcknowledgeException());
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index 95f16966292..a6a4108189c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -74,8 +74,8 @@ public class ShareCompletedFetchTest {
         int numRecordsPerBatch = 10;
         int numRecords = 20;        // Records for 10-29, in 2 equal batches
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset, numRecordsPerBatch, 2))
-                .setAcquiredRecords(acquiredRecords(startingOffset, 
numRecords));
+            .setRecords(newRecords(startingOffset, numRecordsPerBatch, 2))
+            .setAcquiredRecords(acquiredRecords(startingOffset, numRecords));
 
         Deserializers<String, String> deserializers = newStringDeserializers();
 
@@ -139,8 +139,8 @@ public class ShareCompletedFetchTest {
         long startingOffset = 10L;
         int numRecords = 10;
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset, numRecords + 500))
-                .setAcquiredRecords(acquiredRecords(startingOffset + 500, 
numRecords));
+            .setRecords(newRecords(startingOffset, numRecords + 500))
+            .setAcquiredRecords(acquiredRecords(startingOffset + 500, 
numRecords));
 
         Deserializers<String, String> deserializers = newStringDeserializers();
 
@@ -167,8 +167,9 @@ public class ShareCompletedFetchTest {
         int numRecords = 10;
         Records rawRecords = newTransactionalRecords(numRecords);
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(rawRecords)
-                .setAcquiredRecords(acquiredRecords(0L, numRecords));
+            .setRecords(rawRecords)
+            .setAcquiredRecords(acquiredRecords(0L, numRecords));
+
         ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
         try (final Deserializers<String, String> deserializers = 
newStringDeserializers()) {
             ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 10, true);
@@ -184,8 +185,8 @@ public class ShareCompletedFetchTest {
         int startingOffset = 0;
         int numRecords = 10;
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset, numRecords))
-                .setAcquiredRecords(acquiredRecords(0L, 10));
+            .setRecords(newRecords(startingOffset, numRecords))
+            .setAcquiredRecords(acquiredRecords(0L, 10));
 
         try (final Deserializers<String, String> deserializers = 
newStringDeserializers()) {
             ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
@@ -200,7 +201,7 @@ public class ShareCompletedFetchTest {
     @Test
     public void testNoRecordsInFetch() {
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setPartitionIndex(0);
+            .setPartitionIndex(0);
 
         ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
         try (final Deserializers<String, String> deserializers = 
newStringDeserializers()) {
@@ -219,7 +220,7 @@ public class ShareCompletedFetchTest {
                 Compression.NONE,
                 TimestampType.CREATE_TIME,
                 0);
-             final UUIDSerializer serializer = new UUIDSerializer()) {
+            final UUIDSerializer serializer = new UUIDSerializer()) {
             builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, 
UUID.randomUUID())));
             builder.append(0L, "key".getBytes(), "value".getBytes());
             Headers headers = new RecordHeaders();
@@ -229,9 +230,9 @@ public class ShareCompletedFetchTest {
             Records records = builder.build();
 
             ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                    .setPartitionIndex(0)
-                    .setRecords(records)
-                    .setAcquiredRecords(acquiredRecords(0L, 4));
+                .setPartitionIndex(0)
+                .setRecords(records)
+                .setAcquiredRecords(acquiredRecords(0L, 4));
 
             try (final Deserializers<UUID, UUID> deserializers = 
newUuidDeserializers()) {
                 ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
@@ -301,8 +302,8 @@ public class ShareCompletedFetchTest {
         List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>(acquiredRecords(0L, 3));
         acquiredRecords.addAll(acquiredRecords(6L, 3));
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset, numRecords))
-                .setAcquiredRecords(acquiredRecords);
+            .setRecords(newRecords(startingOffset, numRecords))
+            .setAcquiredRecords(acquiredRecords);
 
         Deserializers<String, String> deserializers = newStringDeserializers();
 
@@ -335,8 +336,8 @@ public class ShareCompletedFetchTest {
         }
 
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset, numRecords))
-                .setAcquiredRecords(acquiredRecords);
+            .setRecords(newRecords(startingOffset, numRecords))
+            .setAcquiredRecords(acquiredRecords);
 
         Deserializers<String, String> deserializers = newStringDeserializers();
 
@@ -366,17 +367,17 @@ public class ShareCompletedFetchTest {
         // Offsets 5-9 will be duplicates
         List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
         acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
-                .setFirstOffset(0L)
-                .setLastOffset(9L)
-                .setDeliveryCount((short) 1));
+            .setFirstOffset(0L)
+            .setLastOffset(9L)
+            .setDeliveryCount((short) 1));
         acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
-                .setFirstOffset(5L)
-                .setLastOffset(14L)
-                .setDeliveryCount((short) 2));
+            .setFirstOffset(5L)
+            .setLastOffset(14L)
+            .setDeliveryCount((short) 2));
 
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset, numRecords))
-                .setAcquiredRecords(acquiredRecords);
+            .setRecords(newRecords(startingOffset, numRecords))
+            .setAcquiredRecords(acquiredRecords);
 
         ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
 
@@ -392,25 +393,24 @@ public class ShareCompletedFetchTest {
         
         // Verify first occurrence (offset 5 should have deliveryCount=1 from 
first range)
         ConsumerRecord<String, String> record5 = records.stream()
-                .filter(r -> r.offset() == 5L)
-                .findFirst()
-                .orElse(null);
+            .filter(r -> r.offset() == 5L)
+            .findFirst()
+            .orElse(null);
         assertNotNull(record5);
         assertEquals(Optional.of((short) 1), record5.deliveryCount());
         
         // Verify offset 10 has deliveryCount=2 from second range
         ConsumerRecord<String, String> record10 = records.stream()
-                .filter(r -> r.offset() == 10L)
-                .findFirst()
-                .orElse(null);
+            .filter(r -> r.offset() == 10L)
+            .findFirst()
+            .orElse(null);
         assertNotNull(record10);
         assertEquals(Optional.of((short) 2), record10.deliveryCount());
         
         // Verify all offsets are unique
         Set<Long> offsetSet = new HashSet<>();
         for (ConsumerRecord<String, String> record : records) {
-            assertTrue(offsetSet.add(record.offset()), 
-                    "Duplicate offset found in results: " + record.offset());
+            assertTrue(offsetSet.add(record.offset()), "Duplicate offset found 
in results: " + record.offset());
         }
     }
 
@@ -423,13 +423,13 @@ public class ShareCompletedFetchTest {
         ShareFetchMetricsAggregator shareFetchMetricsAggregator = new 
ShareFetchMetricsAggregator(shareFetchMetricsManager, partitionSet);
 
         return new ShareCompletedFetch(
-                logContext,
-                BufferSupplier.create(),
-                0,
-                TIP,
-                partitionData,
-                shareFetchMetricsAggregator,
-                ApiKeys.SHARE_FETCH.latestVersion());
+            logContext,
+            BufferSupplier.create(),
+            0,
+            TIP,
+            partitionData,
+            shareFetchMetricsAggregator,
+            ApiKeys.SHARE_FETCH.latestVersion());
     }
 
     private static Deserializers<UUID, UUID> newUuidDeserializers() {
@@ -481,9 +481,9 @@ public class ShareCompletedFetchTest {
 
     public static List<ShareFetchResponseData.AcquiredRecords> 
acquiredRecords(long firstOffset, int count) {
         ShareFetchResponseData.AcquiredRecords acquiredRecords = new 
ShareFetchResponseData.AcquiredRecords()
-                .setFirstOffset(firstOffset)
-                .setLastOffset(firstOffset + count - 1)
-                .setDeliveryCount((short) 1);
+            .setFirstOffset(firstOffset)
+            .setLastOffset(firstOffset + count - 1)
+            .setDeliveryCount((short) 1);
         return Collections.singletonList(acquiredRecords);
     }
 
@@ -518,11 +518,11 @@ public class ShareCompletedFetchTest {
                                         int offset,
                                         Time time) {
         MemoryRecords.writeEndTransactionalMarker(buffer,
-                offset,
-                time.milliseconds(),
-                0,
-                PRODUCER_ID,
-                PRODUCER_EPOCH,
-                new EndTransactionMarker(ControlRecordType.COMMIT, 0));
+            offset,
+            time.milliseconds(),
+            0,
+            PRODUCER_ID,
+            PRODUCER_EPOCH,
+            new EndTransactionMarker(ControlRecordType.COMMIT, 0));
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 11d5adb8a0f..d2abe110ceb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.ShareAcquireMode;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
@@ -130,7 +131,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity", 
"JavaNCSS"})
 public class ShareConsumeRequestManagerTest {
     private final String topicName = "test";
     private final String topicName2 = "test-2";
@@ -175,6 +176,7 @@ public class ShareConsumeRequestManagerTest {
     private List<ShareFetchResponseData.AcquiredRecords> emptyAcquiredRecords;
     private ShareFetchMetricsRegistry shareFetchMetricsRegistry;
     private List<Map<TopicIdPartition, Acknowledgements>> 
completedAcknowledgements;
+    private HashSet<Long> renewedRecords;
 
     @BeforeEach
     public void setup() {
@@ -182,6 +184,7 @@ public class ShareConsumeRequestManagerTest {
         acquiredRecords = ShareCompletedFetchTest.acquiredRecords(1L, 3);
         emptyAcquiredRecords = new ArrayList<>();
         completedAcknowledgements = new LinkedList<>();
+        renewedRecords = new HashSet<>();
     }
 
     private void assignFromSubscribed(Set<TopicPartition> partitions) {
@@ -482,16 +485,16 @@ public class ShareConsumeRequestManagerTest {
         ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
 
         // Passing null acknowledgements should mean we do not send the 
background event at all.
-        resultHandler.complete(tip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
+        resultHandler.complete(tip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
         assertEquals(0, completedAcknowledgements.size());
 
         // Setting the request type to COMMIT_SYNC should still not send any 
background event
         // as we have initialized remainingResults to null.
-        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(0, completedAcknowledgements.size());
 
         // Sending non-null acknowledgements means we do send the background 
event
-        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
     }
 
@@ -511,16 +514,16 @@ public class ShareConsumeRequestManagerTest {
         ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
 
         // We only send the background event after all results have been 
completed.
-        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(0, completedAcknowledgements.size());
         assertFalse(future.isDone());
 
-        resultHandler.complete(t2ip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+        resultHandler.complete(t2ip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(0, completedAcknowledgements.size());
         assertFalse(future.isDone());
 
         // After third response is received, we send the background event.
-        resultHandler.complete(tip1, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+        resultHandler.complete(tip1, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(1, completedAcknowledgements.size());
         assertEquals(2, completedAcknowledgements.get(0).size());
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
@@ -985,6 +988,7 @@ public class ShareConsumeRequestManagerTest {
         // Send acknowledgements via ShareFetch
         shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
         fetchRecords();
+
         // Subscription changes.
         subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
         subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
@@ -1024,6 +1028,7 @@ public class ShareConsumeRequestManagerTest {
         // Send acknowledgements via ShareFetch
         shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
         fetchRecords();
+
         // Subscription changes.
         subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
 
@@ -1092,7 +1097,6 @@ public class ShareConsumeRequestManagerTest {
         // Change the subscription.
         subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
 
-
         // Now we will be sending the request to node1 only as leader for tip1 
is node1.
         // We do not build the request for tip0 as there are no 
acknowledgements to send.
         NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
@@ -2539,6 +2543,63 @@ public class ShareConsumeRequestManagerTest {
         }
     }
 
+    @Test
+    public void testShareFetchWithRenewAcknowledgement() {
+        buildRequestManager();
+
+        assignFromSubscribed(Collections.singleton(tp0));
+        sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
+
+        Acknowledgements acknowledgements = getAcknowledgements(1,
+            AcknowledgeType.RENEW, AcknowledgeType.RENEW, 
AcknowledgeType.RENEW);
+
+        // Reading records from the share fetch buffer.
+        fetchRecords();
+
+        // Piggyback acknowledgements
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
+
+        NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
+        assertEquals(1, pollResult.unsentRequests.size());
+        ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(0).requestBuilder();
+        assertTrue(builder.data().isRenewAck());
+
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        assertEquals(3.0,
+            
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
+
+        assertEquals(0, renewedRecords.size());
+
+        client.prepareResponse(fullFetchResponse(tip0, MemoryRecords.EMPTY, 
List.of(), Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        assertEquals(3, renewedRecords.size());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.isEmpty());
+
+        Acknowledgements acknowledgements2 = getAcknowledgements(1,
+            AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, 
AcknowledgeType.ACCEPT);
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements2)));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+        assertEquals(6.0,
+            
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
+    }
+
     private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition 
tp, Errors error) {
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions 
= Map.of(tp,
                 new ShareFetchResponseData.PartitionData()
@@ -2702,7 +2763,7 @@ public class ShareConsumeRequestManagerTest {
                 subscriptions,
                 shareFetchConfig,
                 deserializers);
-        BackgroundEventHandler backgroundEventHandler = new 
TestableBackgroundEventHandler(time, completedAcknowledgements);
+        BackgroundEventHandler backgroundEventHandler = new 
TestableBackgroundEventHandler(time, completedAcknowledgements, renewedRecords);
         shareConsumeRequestManager = spy(new 
TestableShareConsumeRequestManager<>(
                 logContext,
                 groupId,
@@ -2896,16 +2957,22 @@ public class ShareConsumeRequestManagerTest {
 
     private static class TestableBackgroundEventHandler extends 
BackgroundEventHandler {
         List<Map<TopicIdPartition, Acknowledgements>> 
completedAcknowledgements;
+        Set<Long> renewedRecords;
 
-        public TestableBackgroundEventHandler(Time time, 
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements) {
+        public TestableBackgroundEventHandler(Time time, 
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements, 
Set<Long> renewedRecords) {
             super(new LinkedBlockingQueue<>(), time, 
mock(AsyncConsumerMetrics.class));
             this.completedAcknowledgements = completedAcknowledgements;
+            this.renewedRecords = renewedRecords;
         }
 
         public void add(BackgroundEvent event) {
             if (event.type() == 
BackgroundEvent.Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK) {
                 ShareAcknowledgementCommitCallbackEvent 
shareAcknowledgementCommitCallbackEvent = 
(ShareAcknowledgementCommitCallbackEvent) event;
                 
completedAcknowledgements.add(shareAcknowledgementCommitCallbackEvent.acknowledgementsMap());
+            } else if (event.type() == 
BackgroundEvent.Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE) {
+                ShareRenewAcknowledgementsCompleteEvent 
shareRenewAcknowledgementsCompleteEvent = 
(ShareRenewAcknowledgementsCompleteEvent) event;
+                
shareRenewAcknowledgementsCompleteEvent.acknowledgementsMap().values().forEach(acks
 ->
+                    acks.getAcknowledgementsTypeMap().forEach((offset, 
ackType) -> renewedRecords.add(offset)));
             }
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 9a6da634c50..6e819a6888a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,6 +31,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCo
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
 import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -467,6 +469,91 @@ public class ShareConsumerImplTest {
         assertEquals(2, newRecords.count(), "Should have received 2 new 
records");
     }
 
+    @Test
+    public void testExplicitModeRenewAndAcknowledgeOnPoll() {
+        // Setup consumer with explicit acknowledgement mode
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(
+            mock(ShareFetchBuffer.class),
+            subscriptions,
+            "group-id",
+            "client-id",
+            "explicit");
+
+        // Setup test data
+        String topic = "test-topic";
+        int partition = 0;
+        TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 
partition, topic);
+        ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, 
tip);
+        batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1", 
"value1"));
+        batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2", 
"value2"));
+
+        // Setup first fetch to return records
+        ShareFetch<String, String> firstFetch = ShareFetch.empty();
+        firstFetch.add(tip, batch);
+        doReturn(firstFetch)
+            .when(fetchCollector)
+            .collect(any(ShareFetchBuffer.class));
+
+        // Setup subscription
+        List<String> topics = Collections.singletonList(topic);
+        
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
+        consumer.subscribe(topics);
+
+        // First poll should succeed and return records
+        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+        assertEquals(2, records.count(), "Should have received 2 records");
+
+        // Renew the first record and accept the second
+        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
+        consumer.acknowledge(iterator.next(), AcknowledgeType.RENEW);
+        consumer.acknowledge(iterator.next(), AcknowledgeType.ACCEPT);
+
+        // Second poll should succeed and return the renewed record again
+        records = consumer.poll(Duration.ofMillis(100));
+        assertEquals(0, records.count(), "Should have received 1 record");
+        assertTrue(firstFetch.hasRenewals());
+
+        Acknowledgements acks = Acknowledgements.empty();
+        acks.add(0, AcknowledgeType.RENEW);
+        acks.complete(null);
+        ShareRenewAcknowledgementsCompleteEvent e = new 
ShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
+        backgroundEventQueue.add(e);
+
+        records = consumer.poll(Duration.ofMillis(100));
+        assertEquals(1, records.count(), "Should have received 1 record");
+        assertFalse(firstFetch.hasRenewals());
+        iterator = records.iterator();
+        ConsumerRecord<String, String> renewedRecord = iterator.next();
+        assertEquals(0, renewedRecord.offset());
+        consumer.acknowledge(renewedRecord);
+
+        // Setup next fetch to return no records
+        doReturn(ShareFetch.empty())
+            .when(fetchCollector)
+            .collect(any(ShareFetchBuffer.class));
+
+        // Third poll should return no records
+        records = consumer.poll(Duration.ofMillis(100));
+        assertTrue(records.isEmpty());
+
+        // Setup next fetch to return new records
+        ShareFetch<String, String> thirdFetch = ShareFetch.empty();
+        ShareInFlightBatch<String, String> newBatch = new 
ShareInFlightBatch<>(2, tip);
+        newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3", 
"value3"));
+        newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4", 
"value4"));
+        thirdFetch.add(tip, newBatch);
+
+        // Reset mock to return new records
+        doReturn(thirdFetch)
+            .when(fetchCollector)
+            .collect(any(ShareFetchBuffer.class));
+
+        // Verify that poll succeeds and returns new records
+        ConsumerRecords<String, String> newRecords = 
consumer.poll(Duration.ofMillis(100));
+        assertEquals(2, newRecords.count(), "Should have received 2 new 
records");
+    }
+
     @Test
     public void testCloseWithTopicAuthorizationException() {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index 0667ebde700..d3291983d6e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
@@ -49,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -130,6 +133,71 @@ public class ShareFetchCollectorTest {
         assertTrue(completedFetch.isConsumed());
     }
 
+    @Test
+    public void testWithRenew() {
+        int recordCount = DEFAULT_MAX_POLL_RECORDS;
+        buildDependencies();
+        subscribeAndAssign(topicAPartition0);
+
+        ShareCompletedFetch completedFetch = completedFetchBuilder
+            .recordCount(recordCount)
+            .build();
+
+        // Validate that the buffer is empty until after we add the fetch data.
+        assertTrue(fetchBuffer.isEmpty());
+        fetchBuffer.add(List.of(completedFetch));
+        assertFalse(fetchBuffer.isEmpty());
+
+        // Validate that the completed fetch isn't initialized just because we 
add it to the buffer.
+        assertFalse(completedFetch.isInitialized());
+
+        // Fetch the data and validate that we get all the records we want 
back.
+        ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
+        assertFalse(fetch.isEmpty());
+        assertEquals(recordCount, fetch.numRecords());
+
+        // When we collected the data from the buffer, this will cause the 
completed fetch to get initialized.
+        assertTrue(completedFetch.isInitialized());
+
+        // However, even though we've collected the data, it isn't 
(completely) consumed yet.
+        assertFalse(completedFetch.isConsumed());
+
+        // The buffer is now considered "empty" because our queue is empty.
+        assertTrue(fetchBuffer.isEmpty());
+        assertNull(fetchBuffer.peek());
+        assertNull(fetchBuffer.poll());
+
+        // However, while the queue is "empty", the next-in-line fetch is 
actually still in the buffer.
+        assertNotNull(fetchBuffer.nextInLineFetch());
+
+        assertEquals(500, fetch.numRecords());
+        ConsumerRecord<String, String> record = new 
ConsumerRecord<>(topicAPartition0.topic(), topicAPartition0.partition(), 0, "", 
"");
+        fetch.acknowledge(record, AcknowledgeType.RENEW);
+        assertEquals(DEFAULT_MAX_POLL_RECORDS, fetch.numRecords());
+        assertFalse(fetch.hasRenewals());
+
+        Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = 
fetch.takeAcknowledgedRecords();
+        assertTrue(fetch.hasRenewals());
+        assertEquals(DEFAULT_MAX_POLL_RECORDS - 1, fetch.numRecords());
+
+        Acknowledgements acks = 
acknowledgementsMap.get(topicAPartition0).acknowledgements();
+        acks.complete(null);
+        fetch.renew(Map.of(topicAPartition0, acks));
+        assertTrue(fetch.hasRenewals());
+        fetch.takeRenewedRecords();
+        assertFalse(fetch.hasRenewals());
+        assertEquals(DEFAULT_MAX_POLL_RECORDS, fetch.numRecords());
+
+        // Now attempt to collect more records from the fetch buffer.
+        fetch = fetchCollector.collect(fetchBuffer);
+        assertEquals(0, fetch.numRecords());
+        assertTrue(fetch.isEmpty());
+
+        // However, once we read *past* the end of the records in the 
ShareCompletedFetch, then we will call
+        // drain on it, and it will be considered all consumed.
+        assertTrue(completedFetch.isConsumed());
+    }
+
     @ParameterizedTest
     @MethodSource("testErrorInInitializeSource")
     public void testErrorInInitialize(RuntimeException expectedException) {
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 095fe2956b6..3150e8e28a6 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -623,8 +623,8 @@ public class SharePartitionManager implements AutoCloseable 
{
 
     // Visible for testing.
     void processShareFetch(ShareFetch shareFetch) {
-        if (shareFetch.topicIdPartitions().isEmpty()) {
-            // If there are no partitions to fetch then complete the future 
with an empty map.
+        if (shareFetch.topicIdPartitions().isEmpty() || 
shareFetch.maxFetchRecords() == 0 || shareFetch.fetchParams().maxBytes == 0) {
+            // If there are no partitions or no data requested to fetch then 
complete the future with an empty map.
             shareFetch.maybeComplete(Map.of());
             return;
         }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e742dbe6bd3..2b96099afcb 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -725,7 +725,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     val send: Seq[TopicIdPartition] = Seq(
       new TopicIdPartition(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), 
new TopicPartition(topic, part)))
     val ackMap = new util.HashMap[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]]
-    requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100, 
0, Int.MaxValue, 500, 500, ShareAcquireMode.BATCH_OPTIMIZED.id(),
+    requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100, 
0, Int.MaxValue, 500, 500, ShareAcquireMode.BATCH_OPTIMIZED.id(), false,
       send.asJava, Seq.empty.asJava, ackMap).build()
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index b76a19faa7e..9538e2a425f 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -2473,16 +2473,17 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
                                       maxBytes: Int = Int.MaxValue,
                                       maxRecords: Int = 500,
                                       batchSize: Int = 500,
-                                      shareAcquireMode: ShareAcquireMode = 
ShareAcquireMode.BATCH_OPTIMIZED): ShareFetchRequest = {
-    ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, 
minBytes, maxBytes, maxRecords, batchSize, shareAcquireMode.id, send, forget, 
acknowledgementsMap)
+                                      shareAcquireMode: ShareAcquireMode = 
ShareAcquireMode.BATCH_OPTIMIZED,
+                                      isRenewAck: Boolean = false): 
ShareFetchRequest = {
+    ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, 
minBytes, maxBytes, maxRecords, batchSize, shareAcquireMode.id, isRenewAck, 
send, forget, acknowledgementsMap)
       .build()
   }
 
   private def createShareAcknowledgeRequest(groupId: String,
                                             metadata: ShareRequestMetadata,
-                                            acknowledgementsMap: 
util.Map[TopicIdPartition, 
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]]
-                                           ): ShareAcknowledgeRequest = {
-    ShareAcknowledgeRequest.Builder.forConsumer(groupId, metadata, 
acknowledgementsMap)
+                                            acknowledgementsMap: 
util.Map[TopicIdPartition, 
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]],
+                                            isRenewAck: Boolean = false): 
ShareAcknowledgeRequest = {
+    ShareAcknowledgeRequest.Builder.forConsumer(groupId, metadata, isRenewAck, 
acknowledgementsMap)
       .build()
   }
 }

Reply via email to