This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch revert-20905-KAFKA-19892-2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4f3ec9db1430325202990140b84a764eacc8ed1f
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Nov 19 18:26:22 2025 +0000

    Revert "KAFKA-19892: Client use of ShareAcknowledge acquisition lock 
timeout …"
    
    This reverts commit 0491e1124eaa163a7ed7c139ab477c9f4a7b71c9.
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 19 ------
 .../internals/ShareConsumeRequestManager.java      | 74 ++++++++++------------
 .../consumer/internals/ShareConsumerImpl.java      |  2 +-
 .../clients/consumer/internals/ShareFetch.java     | 14 +---
 .../events/ShareAcknowledgementEvent.java          | 10 +--
 .../internals/ShareConsumeRequestManagerTest.java  | 12 ++--
 .../consumer/internals/ShareConsumerImplTest.java  |  2 +-
 .../internals/ShareFetchCollectorTest.java         |  3 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  5 +-
 9 files changed, 47 insertions(+), 94 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 47b35492d35..62a4441a29a 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2993,10 +2993,6 @@ public class ShareConsumerTest {
             shareConsumer.subscribe(List.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
             assertEquals(10, records.count());
-            assertEquals(Optional.of(15000), 
shareConsumer.acquisitionLockTimeoutMs());
-
-            // The updated acquisition lock timeout is only applied when the 
next poll is called.
-            alterShareRecordLockDurationMs("group1", 25000);
 
             int count = 0;
             Map<TopicIdPartition, Optional<KafkaException>> result;
@@ -3008,7 +3004,6 @@ public class ShareConsumerTest {
                 }
                 result = shareConsumer.commitSync();
                 assertEquals(1, result.size());
-                assertEquals(Optional.of(15000), 
shareConsumer.acquisitionLockTimeoutMs());
                 assertEquals(Optional.empty(), result.get(new 
TopicIdPartition(tpId, tp.partition(), tp.topic())));
                 count++;
             }
@@ -3016,7 +3011,6 @@ public class ShareConsumerTest {
             // Get the rest of all 5 records.
             records = waitedPoll(shareConsumer, 2500L, 5);
             assertEquals(5, records.count());
-            assertEquals(Optional.of(25000), 
shareConsumer.acquisitionLockTimeoutMs());
             for (ConsumerRecord<byte[], byte[]> record : records) {
                 shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
             }
@@ -3447,19 +3441,6 @@ public class ShareConsumerTest {
         }
     }
 
-    private void alterShareRecordLockDurationMs(String groupId, int newValue) {
-        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
-        Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
-        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
-            GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
Integer.toString(newValue)), AlterConfigOp.OpType.SET)));
-        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
-        try (Admin adminClient = createAdminClient()) {
-            assertDoesNotThrow(() -> 
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
-                .all()
-                .get(60, TimeUnit.SECONDS), "Failed to alter configs");
-        }
-    }
-
     /**
      * Test utility which encapsulates a {@link ShareConsumer} whose record 
processing
      * behavior can be supplied as a function argument.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index db68877556b..ecb18c83975 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -171,7 +171,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             if (nodesWithPendingRequests.contains(node.id())) {
                 log.trace("Skipping fetch for partition {} because previous 
fetch request to {} has not been processed", partition, node.id());
             } else {
-                // If there is a leader and no in-flight requests, issue a new 
fetch.
+                // if there is a leader and no in-flight requests, issue a new 
fetch
                 ShareSessionHandler handler = handlerMap.computeIfAbsent(node,
                         k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
@@ -234,9 +234,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                 topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
                                 log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, nodeId);
                             } else {
-                                log.debug("Leader for the partition is down or 
has changed, failing acknowledgements for partition {}", tip);
+                                log.debug("Leader for the partition is down or 
has changed, failing Acknowledgements for partition {}", tip);
                                 
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                                maybeSendShareAcknowledgementEvent(Map.of(tip, 
acks), true, Optional.empty());
+                                maybeSendShareAcknowledgementEvent(Map.of(tip, 
acks), true);
                             }
                         });
 
@@ -246,7 +246,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             }
         });
 
-        // Iterate over the share session handlers and build a list of 
UnsentRequests.
+        // Iterate over the share session handlers and build a list of 
UnsentRequests
         List<UnsentRequest> requests = 
handlerMap.entrySet().stream().map(entry -> {
             Node target = entry.getKey();
             ShareSessionHandler handler = entry.getValue();
@@ -302,7 +302,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             // Failing the acknowledgements as we cannot have piggybacked 
acknowledgements in the initial ShareFetchRequest.
             log.debug("Cannot send acknowledgements on initial epoch for 
ShareSession for partition {}", tip);
             
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
-            maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements), 
true, Optional.empty());
+            maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements), 
true);
             return false;
         } else {
             metricsManager.recordAcknowledgementSent(acknowledgements.size());
@@ -410,11 +410,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         this.isAcknowledgementCommitCallbackRegistered = 
isAcknowledgementCommitCallbackRegistered;
     }
 
-    private void maybeSendShareAcknowledgementEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap,
-                                                    boolean 
checkForRenewAcknowledgements,
-                                                    Optional<Integer> 
acquisitionLockTimeoutMs) {
+    private void maybeSendShareAcknowledgementEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap, boolean checkForRenewAcknowledgements) {
         if (isAcknowledgementCommitCallbackRegistered || 
checkForRenewAcknowledgements) {
-            ShareAcknowledgementEvent event = new 
ShareAcknowledgementEvent(acknowledgementsMap, checkForRenewAcknowledgements, 
acquisitionLockTimeoutMs);
+            ShareAcknowledgementEvent event = new 
ShareAcknowledgementEvent(acknowledgementsMap, checkForRenewAcknowledgements);
             acknowledgeEventHandler.add(event);
         }
     }
@@ -566,7 +564,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             resultCount.incrementAndGet();
                         } else {
                             
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true, Optional.empty());
+                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true);
                         }
                     }
                 }
@@ -641,7 +639,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             }
                         } else {
                             
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true, Optional.empty());
+                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true);
                         }
                     }
                 }
@@ -679,7 +677,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
             } else {
                 
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcks.acknowledgements()), true, Optional.empty());
+                maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcks.acknowledgements()), true);
             }
         });
 
@@ -698,7 +696,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             }
                         } else {
                             
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
acks), true, Optional.empty());
+                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
acks), true);
                         }
                     });
                 }
@@ -802,14 +800,12 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         
acks.complete(Errors.forCode(response.error().code()).exception());
                         
metricsManager.recordFailedAcknowledgements(acks.size());
                     });
-                    
maybeSendShareAcknowledgementEvent(nodeAcknowledgementsInFlight, 
requestData.isRenewAck(), Optional.empty());
+                    
maybeSendShareAcknowledgementEvent(nodeAcknowledgementsInFlight, 
requestData.isRenewAck());
                 }
                 return;
             }
 
             final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
-            final Optional<Integer> responseAcquisitionLockTimeoutMs = 
response.data().acquisitionLockTimeoutMs() > 0
-                ? Optional.of(response.data().acquisitionLockTimeoutMs()) : 
Optional.empty();
 
             response.data().responses().forEach(topicResponse ->
                 topicResponse.partitions().forEach(partition -> {
@@ -842,7 +838,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         
acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode())
                                 
.exception(partitionData.acknowledgeErrorMessage()));
                         Map<TopicIdPartition, Acknowledgements> acksMap = 
Map.of(tip, acks);
-                        maybeSendShareAcknowledgementEvent(acksMap, 
requestData.isRenewAck(), responseAcquisitionLockTimeoutMs);
+                        maybeSendShareAcknowledgementEvent(acksMap, 
requestData.isRenewAck());
                     }
                 }
 
@@ -862,7 +858,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         fetchTarget.id(),
                         tip,
                         partitionData,
-                        responseAcquisitionLockTimeoutMs,
+                        response.data().acquisitionLockTimeoutMs() > 0
+                            ? 
Optional.of(response.data().acquisitionLockTimeoutMs()) : Optional.empty(),
                         shareFetchMetricsAggregator,
                         requestVersion)
                 );
@@ -880,7 +877,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
                 
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, 
acknowledgements) -> {
                     acknowledgements.complete(new 
InvalidRecordStateException(INVALID_RESPONSE));
-                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), true, Optional.empty());
+                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), true);
                 });
             }
 
@@ -930,7 +927,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             
acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception());
                         }
                         Map<TopicIdPartition, Acknowledgements> acksMap = 
Map.of(tip, acks);
-                        maybeSendShareAcknowledgementEvent(acksMap, 
requestData.isRenewAck(), Optional.empty());
+                        maybeSendShareAcknowledgementEvent(acksMap, 
requestData.isRenewAck());
                     }
                 }
             }));
@@ -951,8 +948,6 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         try {
             log.debug("Completed ShareAcknowledge request from node {} 
successfully", fetchTarget.id());
             ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) 
resp.responseBody();
-            final Optional<Integer> responseAcquisitionLockTimeoutMs = 
response.data().acquisitionLockTimeoutMs() > 0
-                ? Optional.of(response.data().acquisitionLockTimeoutMs()) : 
Optional.empty();
 
             Map<TopicPartition, Metadata.LeaderIdAndEpoch> 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
 
@@ -966,8 +961,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     if (partitionData.errorCode() != Errors.NONE.code()) {
                         
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                     }
-
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partitionData.errorCode()), requestData.isRenewAck(), 
responseAcquisitionLockTimeoutMs);
+                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partitionData.errorCode()), requestData.isRenewAck());
                 }));
 
                 
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
@@ -986,8 +980,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     }
                 } else {
                     AtomicBoolean shouldRetry = new AtomicBoolean(false);
-
-                    // Check all partition-level error codes.
+                    // Check all partition level error codes
                     response.data().responses().forEach(topicResponse -> 
topicResponse.partitions().forEach(partitionData -> {
                         Errors partitionError = 
Errors.forCode(partitionData.errorCode());
                         TopicIdPartition tip = 
lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex());
@@ -996,7 +989,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         }
 
                         handlePartitionError(partitionData, 
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState,
-                            partitionError, tip, shouldRetry, 
requestData.isRenewAck(), responseAcquisitionLockTimeoutMs);
+                            partitionError, tip, shouldRetry, 
requestData.isRenewAck());
                     }));
 
                     processRetryLogic(acknowledgeRequestState, shouldRetry, 
responseCompletionTimeMs);
@@ -1042,7 +1035,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
 
                 
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
-                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error), requestData.isRenewAck(), Optional.empty());
+                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error), requestData.isRenewAck());
             }));
 
             acknowledgeRequestState.processingComplete();
@@ -1063,8 +1056,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                       Errors partitionError,
                                       TopicIdPartition tip,
                                       AtomicBoolean shouldRetry,
-                                      boolean isRenewAck,
-                                      Optional<Integer> 
acquisitionLockTimeoutMs) {
+                                      boolean isRenewAck) {
         if (partitionError.exception() != null) {
             boolean retry = false;
             if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH || partitionError == 
Errors.UNKNOWN_TOPIC_OR_PARTITION) {
@@ -1083,10 +1075,10 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
             } else {
                 
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
-                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError, isRenewAck, Optional.empty());
+                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError, isRenewAck);
             }
         } else {
-            acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError, isRenewAck, acquisitionLockTimeoutMs);
+            acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError, isRenewAck);
         }
     }
 
@@ -1329,11 +1321,11 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * Sets the error code in the acknowledgements and sends the response
          * through a background event.
          */
-        void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode, boolean isRenewAck, Optional<Integer> 
acquisitionLockTimeoutMs) {
+        void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode, boolean isRenewAck) {
             Acknowledgements acks = inFlightAcknowledgements.remove(tip);
             if (acks != null) {
                 acks.complete(acknowledgeErrorCode.exception());
-                resultHandler.complete(tip, acks, requestType, isRenewAck, 
acquisitionLockTimeoutMs);
+                resultHandler.complete(tip, acks, requestType, isRenewAck);
             } else {
                 log.error("Invalid partition {} received in ShareAcknowledge 
response", tip);
             }
@@ -1349,7 +1341,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 acks.complete(Errors.REQUEST_TIMED_OUT.exception());
                 // We do not know whether this is a renew ack, but handling 
the error as if it were, will ensure
                 // that we do not leave dangling acknowledgements
-                resultHandler.complete(tip, acks, requestType, true, 
Optional.empty());
+                resultHandler.complete(tip, acks, requestType, true);
             }
         }
 
@@ -1368,7 +1360,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
                 // We do not know whether this is a renew ack, but handling 
the error as if it were, will ensure
                 // that we do not leave dangling acknowledgements
-                resultHandler.complete(tip, acks, requestType, true, 
Optional.empty());
+                resultHandler.complete(tip, acks, requestType, true);
             });
             acknowledgementsMapToClear.clear();
             processingComplete();
@@ -1396,7 +1388,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     acknowledgements.complete(exception);
                     // We do not know whether this is a renew ack, but 
handling the error as if it were, will ensure
                     // that we do not leave dangling acknowledgements
-                    resultHandler.complete(partition, acknowledgements, 
requestType, true, Optional.empty());
+                    resultHandler.complete(partition, acknowledgements, 
requestType, true);
                 });
                 inFlightAcknowledgements.clear();
             }
@@ -1440,7 +1432,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     }
 
     /**
-     * Sends a ShareAcknowledgementEvent event to the application when it is 
done
+     * Sends a ShareAcknowledgeCommitCallback event to the application when it 
is done
      * processing all the remaining acknowledgement request states.
      * Also manages completing the future for synchronous acknowledgement 
commit by counting
      * down the results as they are known and completing the future at the end.
@@ -1465,17 +1457,17 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * Handle the result of a ShareAcknowledge request sent to one or more 
nodes and
          * signal the completion when all results are known.
          */
-        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck, 
Optional<Integer> acquisitionLockTimeoutMs) {
+        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck) {
             if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
                 if (acknowledgements != null) {
-                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), isRenewAck, acquisitionLockTimeoutMs);
+                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), isRenewAck);
                 }
             } else {
                 if (acknowledgements != null) {
                     result.put(partition, acknowledgements);
                 }
                 if (remainingResults != null && 
remainingResults.decrementAndGet() == 0) {
-                    maybeSendShareAcknowledgementEvent(result, isRenewAck, 
acquisitionLockTimeoutMs);
+                    maybeSendShareAcknowledgementEvent(result, isRenewAck);
                     future.ifPresent(future -> future.complete(result));
                 }
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 3a7eb81df60..6b97035c9c4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -138,7 +138,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 completedAcknowledgements.add(event.acknowledgementsMap());
             }
             if (event.checkForRenewAcknowledgements()) {
-                currentFetch.renew(event.acknowledgementsMap(), 
event.acquisitionLockTimeoutMs());
+                currentFetch.renew(event.acknowledgementsMap());
             }
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index 83a43cb69ae..cd5203524ab 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -43,7 +43,6 @@ import java.util.Optional;
 public class ShareFetch<K, V> {
     private final Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches;
     private Optional<Integer> acquisitionLockTimeoutMs;
-    private Optional<Integer> acquisitionLockTimeoutMsRenewed;
 
     public static <K, V> ShareFetch<K, V> empty() {
         return new ShareFetch<>(new HashMap<>(), Optional.empty());
@@ -52,7 +51,6 @@ public class ShareFetch<K, V> {
     private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> 
batches, Optional<Integer> acquisitionLockTimeoutMs) {
         this.batches = batches;
         this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
-        this.acquisitionLockTimeoutMsRenewed = Optional.empty();
     }
 
     /**
@@ -144,10 +142,6 @@ public class ShareFetch<K, V> {
         for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : 
batches.entrySet()) {
             entry.getValue().takeRenewals();
         }
-        // Any acquisition lock timeout updated by renewal is applied as the 
renewed records are move back to in-flight
-        if (acquisitionLockTimeoutMsRenewed.isPresent()) {
-            acquisitionLockTimeoutMs = acquisitionLockTimeoutMsRenewed;
-        }
     }
 
     /**
@@ -241,18 +235,16 @@ public class ShareFetch<K, V> {
      * Handles completed renew acknowledgements by returning successfully 
renewed records
      * to the set of in-flight records.
      *
-     * @param acknowledgementsMap      Map from topic-partition to 
acknowledgements for
-     *                                 completed renew acknowledgements
-     * @param acquisitionLockTimeoutMs Optional updated acquisition lock 
timeout
+     * @param acknowledgementsMap Map from topic-partition to acknowledgements 
for
+     *                            completed renew acknowledgements
      *
      * @return The number of records renewed
      */
-    public int renew(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap, Optional<Integer> acquisitionLockTimeoutMs) {
+    public int renew(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
         int recordsRenewed = 0;
         for (Map.Entry<TopicIdPartition, Acknowledgements> entry : 
acknowledgementsMap.entrySet()) {
             recordsRenewed += 
batches.get(entry.getKey()).renew(entry.getValue());
         }
-        acquisitionLockTimeoutMsRenewed = acquisitionLockTimeoutMs;
         return recordsRenewed;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
index ff7f7598f68..9dfc06b1f42 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
@@ -21,7 +21,6 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
 import org.apache.kafka.common.TopicIdPartition;
 
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * This is the class of events created by the {@link ConsumerNetworkThread 
network thread} to indicate completion
@@ -31,14 +30,11 @@ public class ShareAcknowledgementEvent {
 
     private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
     private final boolean checkForRenewAcknowledgements;
-    private final Optional<Integer> acquisitionLockTimeoutMs;
 
     public ShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap,
-                                     boolean checkForRenewAcknowledgements,
-                                     Optional<Integer> 
acquisitionLockTimeoutMs) {
+                                     boolean checkForRenewAcknowledgements) {
         this.acknowledgementsMap = acknowledgementsMap;
         this.checkForRenewAcknowledgements = checkForRenewAcknowledgements;
-        this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
     }
 
     public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
@@ -48,8 +44,4 @@ public class ShareAcknowledgementEvent {
     public boolean checkForRenewAcknowledgements() {
         return checkForRenewAcknowledgements;
     }
-
-    public Optional<Integer> acquisitionLockTimeoutMs() {
-        return acquisitionLockTimeoutMs;
-    }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index dc05257bd6e..3064f5125c7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -485,16 +485,16 @@ public class ShareConsumeRequestManagerTest {
         ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
 
         // Passing null acknowledgements should mean we do not send the 
background event at all.
-        resultHandler.complete(tip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false, 
Optional.empty());
+        resultHandler.complete(tip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
         assertEquals(0, completedAcknowledgements.size());
 
         // Setting the request type to COMMIT_SYNC should still not send any 
background event
         // as we have initialized remainingResults to null.
-        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, 
Optional.empty());
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(0, completedAcknowledgements.size());
 
         // Sending non-null acknowledgements means we do send the background 
event
-        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false, 
Optional.empty());
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
     }
 
@@ -514,16 +514,16 @@ public class ShareConsumeRequestManagerTest {
         ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
 
         // We only send the background event after all results have been 
completed.
-        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, 
Optional.empty());
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(0, completedAcknowledgements.size());
         assertFalse(future.isDone());
 
-        resultHandler.complete(t2ip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, 
Optional.empty());
+        resultHandler.complete(t2ip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(0, completedAcknowledgements.size());
         assertFalse(future.isDone());
 
         // After third response is received, we send the background event.
-        resultHandler.complete(tip1, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, 
Optional.empty());
+        resultHandler.complete(tip1, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
         assertEquals(1, completedAcknowledgements.size());
         assertEquals(2, completedAcknowledgements.get(0).size());
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6943c39f060..188a7066981 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -539,7 +539,7 @@ public class ShareConsumerImplTest {
         Acknowledgements acks = Acknowledgements.empty();
         acks.add(0, AcknowledgeType.RENEW);
         acks.complete(null);
-        ShareAcknowledgementEvent e = new 
ShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty());
+        ShareAcknowledgementEvent e = new 
ShareAcknowledgementEvent(Map.of(tip, acks), true);
         acknowledgementEventQueue.add(e);
 
         records = consumer.poll(Duration.ofMillis(100));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index e404d7b3e22..ebc85ed493b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -184,12 +184,11 @@ public class ShareFetchCollectorTest {
 
         Acknowledgements acks = 
acknowledgementsMap.get(topicAPartition0).acknowledgements();
         acks.complete(null);
-        fetch.renew(Map.of(topicAPartition0, acks), Optional.of(20000));
+        fetch.renew(Map.of(topicAPartition0, acks));
         assertTrue(fetch.hasRenewals());
         fetch.takeRenewedRecords();
         assertFalse(fetch.hasRenewals());
         assertEquals(DEFAULT_MAX_POLL_RECORDS, fetch.numRecords());
-        assertEquals(Optional.of(20000), fetch.acquisitionLockTimeoutMs());
 
         // Now attempt to collect more records from the fetch buffer.
         fetch = fetchCollector.collect(fetchBuffer);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6542831859a..b18f14794f1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -4014,9 +4014,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   // the callback for processing a share acknowledge response, invoked before 
throttling
   def processShareAcknowledgeResponse(responseAcknowledgeData: 
Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
                                       request: RequestChannel.Request): 
ShareAcknowledgeResponse = {
-    val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest]
-    val groupId = shareAcknowledgeRequest.data.groupId
-
     val partitions = new util.LinkedHashMap[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]
     val nodeEndpoints = new mutable.HashMap[Int, Node]
     responseAcknowledgeData.foreach{ case(tp, partitionData) =>
@@ -4039,7 +4036,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       0,
       partitions,
       nodeEndpoints.values.toList.asJava,
-      ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager, 
groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs)
+      config.shareGroupConfig.shareGroupRecordLockDurationMs
     )
   }
 


Reply via email to