This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch revert-20905-KAFKA-19892-2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4f3ec9db1430325202990140b84a764eacc8ed1f Author: Andrew Schofield <[email protected]> AuthorDate: Wed Nov 19 18:26:22 2025 +0000 Revert "KAFKA-19892: Client use of ShareAcknowledge acquisition lock timeout …" This reverts commit 0491e1124eaa163a7ed7c139ab477c9f4a7b71c9. --- .../kafka/clients/consumer/ShareConsumerTest.java | 19 ------ .../internals/ShareConsumeRequestManager.java | 74 ++++++++++------------ .../consumer/internals/ShareConsumerImpl.java | 2 +- .../clients/consumer/internals/ShareFetch.java | 14 +--- .../events/ShareAcknowledgementEvent.java | 10 +-- .../internals/ShareConsumeRequestManagerTest.java | 12 ++-- .../consumer/internals/ShareConsumerImplTest.java | 2 +- .../internals/ShareFetchCollectorTest.java | 3 +- core/src/main/scala/kafka/server/KafkaApis.scala | 5 +- 9 files changed, 47 insertions(+), 94 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 47b35492d35..62a4441a29a 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -2993,10 +2993,6 @@ public class ShareConsumerTest { shareConsumer.subscribe(List.of(tp.topic())); ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10); assertEquals(10, records.count()); - assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs()); - - // The updated acquisition lock timeout is only applied when the next poll is called. - alterShareRecordLockDurationMs("group1", 25000); int count = 0; Map<TopicIdPartition, Optional<KafkaException>> result; @@ -3008,7 +3004,6 @@ public class ShareConsumerTest { } result = shareConsumer.commitSync(); assertEquals(1, result.size()); - assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs()); assertEquals(Optional.empty(), result.get(new TopicIdPartition(tpId, tp.partition(), tp.topic()))); count++; } @@ -3016,7 +3011,6 @@ public class ShareConsumerTest { // Get the rest of all 5 records. records = waitedPoll(shareConsumer, 2500L, 5); assertEquals(5, records.count()); - assertEquals(Optional.of(25000), shareConsumer.acquisitionLockTimeoutMs()); for (ConsumerRecord<byte[], byte[]> record : records) { shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); } @@ -3447,19 +3441,6 @@ public class ShareConsumerTest { } } - private void alterShareRecordLockDurationMs(String groupId, int newValue) { - ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); - Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>(); - alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry( - GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Integer.toString(newValue)), AlterConfigOp.OpType.SET))); - AlterConfigsOptions alterOptions = new AlterConfigsOptions(); - try (Admin adminClient = createAdminClient()) { - assertDoesNotThrow(() -> adminClient.incrementalAlterConfigs(alterEntries, alterOptions) - .all() - .get(60, TimeUnit.SECONDS), "Failed to alter configs"); - } - } - /** * Test utility which encapsulates a {@link ShareConsumer} whose record processing * behavior can be supplied as a function argument. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index db68877556b..ecb18c83975 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -171,7 +171,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (nodesWithPendingRequests.contains(node.id())) { log.trace("Skipping fetch for partition {} because previous fetch request to {} has not been processed", partition, node.id()); } else { - // If there is a leader and no in-flight requests, issue a new fetch. + // if there is a leader and no in-flight requests, issue a new fetch ShareSessionHandler handler = handlerMap.computeIfAbsent(node, k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); @@ -234,9 +234,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId); } else { - log.debug("Leader for the partition is down or has changed, failing acknowledgements for partition {}", tip); + log.debug("Leader for the partition is down or has changed, failing Acknowledgements for partition {}", tip); acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true); } }); @@ -246,7 +246,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } }); - // Iterate over the share session handlers and build a list of UnsentRequests. + // Iterate over the share session handlers and build a list of UnsentRequests List<UnsentRequest> requests = handlerMap.entrySet().stream().map(entry -> { Node target = entry.getKey(); ShareSessionHandler handler = entry.getValue(); @@ -302,7 +302,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi // Failing the acknowledgements as we cannot have piggybacked acknowledgements in the initial ShareFetchRequest. log.debug("Cannot send acknowledgements on initial epoch for ShareSession for partition {}", tip); acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements), true, Optional.empty()); + maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements), true); return false; } else { metricsManager.recordAcknowledgementSent(acknowledgements.size()); @@ -410,11 +410,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi this.isAcknowledgementCommitCallbackRegistered = isAcknowledgementCommitCallbackRegistered; } - private void maybeSendShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, - boolean checkForRenewAcknowledgements, - Optional<Integer> acquisitionLockTimeoutMs) { + private void maybeSendShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, boolean checkForRenewAcknowledgements) { if (isAcknowledgementCommitCallbackRegistered || checkForRenewAcknowledgements) { - ShareAcknowledgementEvent event = new ShareAcknowledgementEvent(acknowledgementsMap, checkForRenewAcknowledgements, acquisitionLockTimeoutMs); + ShareAcknowledgementEvent event = new ShareAcknowledgementEvent(acknowledgementsMap, checkForRenewAcknowledgements); acknowledgeEventHandler.add(event); } } @@ -566,7 +564,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi resultCount.incrementAndGet(); } else { nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcknowledgements.acknowledgements()), true, Optional.empty()); + maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcknowledgements.acknowledgements()), true); } } } @@ -641,7 +639,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } } else { nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcknowledgements.acknowledgements()), true, Optional.empty()); + maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcknowledgements.acknowledgements()), true); } } } @@ -679,7 +677,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } } else { nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcks.acknowledgements()), true, Optional.empty()); + maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcks.acknowledgements()), true); } }); @@ -698,7 +696,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } } else { acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true); } }); } @@ -802,14 +800,12 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acks.complete(Errors.forCode(response.error().code()).exception()); metricsManager.recordFailedAcknowledgements(acks.size()); }); - maybeSendShareAcknowledgementEvent(nodeAcknowledgementsInFlight, requestData.isRenewAck(), Optional.empty()); + maybeSendShareAcknowledgementEvent(nodeAcknowledgementsInFlight, requestData.isRenewAck()); } return; } final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = new LinkedHashMap<>(); - final Optional<Integer> responseAcquisitionLockTimeoutMs = response.data().acquisitionLockTimeoutMs() > 0 - ? Optional.of(response.data().acquisitionLockTimeoutMs()) : Optional.empty(); response.data().responses().forEach(topicResponse -> topicResponse.partitions().forEach(partition -> { @@ -842,7 +838,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode()) .exception(partitionData.acknowledgeErrorMessage())); Map<TopicIdPartition, Acknowledgements> acksMap = Map.of(tip, acks); - maybeSendShareAcknowledgementEvent(acksMap, requestData.isRenewAck(), responseAcquisitionLockTimeoutMs); + maybeSendShareAcknowledgementEvent(acksMap, requestData.isRenewAck()); } } @@ -862,7 +858,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi fetchTarget.id(), tip, partitionData, - responseAcquisitionLockTimeoutMs, + response.data().acquisitionLockTimeoutMs() > 0 + ? Optional.of(response.data().acquisitionLockTimeoutMs()) : Optional.empty(), shareFetchMetricsAggregator, requestVersion) ); @@ -880,7 +877,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) { fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, acknowledgements) -> { acknowledgements.complete(new InvalidRecordStateException(INVALID_RESPONSE)); - maybeSendShareAcknowledgementEvent(Map.of(partition, acknowledgements), true, Optional.empty()); + maybeSendShareAcknowledgementEvent(Map.of(partition, acknowledgements), true); }); } @@ -930,7 +927,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception()); } Map<TopicIdPartition, Acknowledgements> acksMap = Map.of(tip, acks); - maybeSendShareAcknowledgementEvent(acksMap, requestData.isRenewAck(), Optional.empty()); + maybeSendShareAcknowledgementEvent(acksMap, requestData.isRenewAck()); } } })); @@ -951,8 +948,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi try { log.debug("Completed ShareAcknowledge request from node {} successfully", fetchTarget.id()); ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody(); - final Optional<Integer> responseAcquisitionLockTimeoutMs = response.data().acquisitionLockTimeoutMs() > 0 - ? Optional.of(response.data().acquisitionLockTimeoutMs()) : Optional.empty(); Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>(); @@ -966,8 +961,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (partitionData.errorCode() != Errors.NONE.code()) { metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); } - - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partitionData.errorCode()), requestData.isRenewAck(), responseAcquisitionLockTimeoutMs); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partitionData.errorCode()), requestData.isRenewAck()); })); acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs); @@ -986,8 +980,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } } else { AtomicBoolean shouldRetry = new AtomicBoolean(false); - - // Check all partition-level error codes. + // Check all partition level error codes response.data().responses().forEach(topicResponse -> topicResponse.partitions().forEach(partitionData -> { Errors partitionError = Errors.forCode(partitionData.errorCode()); TopicIdPartition tip = lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex()); @@ -996,7 +989,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } handlePartitionError(partitionData, partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, - partitionError, tip, shouldRetry, requestData.isRenewAck(), responseAcquisitionLockTimeoutMs); + partitionError, tip, shouldRetry, requestData.isRenewAck()); })); processRetryLogic(acknowledgeRequestState, shouldRetry, responseCompletionTimeMs); @@ -1042,7 +1035,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error), requestData.isRenewAck(), Optional.empty()); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error), requestData.isRenewAck()); })); acknowledgeRequestState.processingComplete(); @@ -1063,8 +1056,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi Errors partitionError, TopicIdPartition tip, AtomicBoolean shouldRetry, - boolean isRenewAck, - Optional<Integer> acquisitionLockTimeoutMs) { + boolean isRenewAck) { if (partitionError.exception() != null) { boolean retry = false; if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH || partitionError == Errors.UNKNOWN_TOPIC_OR_PARTITION) { @@ -1083,10 +1075,10 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } } else { metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError, isRenewAck, Optional.empty()); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError, isRenewAck); } } else { - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError, isRenewAck, acquisitionLockTimeoutMs); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError, isRenewAck); } } @@ -1329,11 +1321,11 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi * Sets the error code in the acknowledgements and sends the response * through a background event. */ - void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCode, boolean isRenewAck, Optional<Integer> acquisitionLockTimeoutMs) { + void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCode, boolean isRenewAck) { Acknowledgements acks = inFlightAcknowledgements.remove(tip); if (acks != null) { acks.complete(acknowledgeErrorCode.exception()); - resultHandler.complete(tip, acks, requestType, isRenewAck, acquisitionLockTimeoutMs); + resultHandler.complete(tip, acks, requestType, isRenewAck); } else { log.error("Invalid partition {} received in ShareAcknowledge response", tip); } @@ -1349,7 +1341,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acks.complete(Errors.REQUEST_TIMED_OUT.exception()); // We do not know whether this is a renew ack, but handling the error as if it were, will ensure // that we do not leave dangling acknowledgements - resultHandler.complete(tip, acks, requestType, true, Optional.empty()); + resultHandler.complete(tip, acks, requestType, true); } } @@ -1368,7 +1360,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } // We do not know whether this is a renew ack, but handling the error as if it were, will ensure // that we do not leave dangling acknowledgements - resultHandler.complete(tip, acks, requestType, true, Optional.empty()); + resultHandler.complete(tip, acks, requestType, true); }); acknowledgementsMapToClear.clear(); processingComplete(); @@ -1396,7 +1388,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acknowledgements.complete(exception); // We do not know whether this is a renew ack, but handling the error as if it were, will ensure // that we do not leave dangling acknowledgements - resultHandler.complete(partition, acknowledgements, requestType, true, Optional.empty()); + resultHandler.complete(partition, acknowledgements, requestType, true); }); inFlightAcknowledgements.clear(); } @@ -1440,7 +1432,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } /** - * Sends a ShareAcknowledgementEvent event to the application when it is done + * Sends a ShareAcknowledgeCommitCallback event to the application when it is done * processing all the remaining acknowledgement request states. * Also manages completing the future for synchronous acknowledgement commit by counting * down the results as they are known and completing the future at the end. @@ -1465,17 +1457,17 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi * Handle the result of a ShareAcknowledge request sent to one or more nodes and * signal the completion when all results are known. */ - public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, AcknowledgeRequestType type, boolean isRenewAck, Optional<Integer> acquisitionLockTimeoutMs) { + public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, AcknowledgeRequestType type, boolean isRenewAck) { if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) { if (acknowledgements != null) { - maybeSendShareAcknowledgementEvent(Map.of(partition, acknowledgements), isRenewAck, acquisitionLockTimeoutMs); + maybeSendShareAcknowledgementEvent(Map.of(partition, acknowledgements), isRenewAck); } } else { if (acknowledgements != null) { result.put(partition, acknowledgements); } if (remainingResults != null && remainingResults.decrementAndGet() == 0) { - maybeSendShareAcknowledgementEvent(result, isRenewAck, acquisitionLockTimeoutMs); + maybeSendShareAcknowledgementEvent(result, isRenewAck); future.ifPresent(future -> future.complete(result)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 3a7eb81df60..6b97035c9c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -138,7 +138,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> { completedAcknowledgements.add(event.acknowledgementsMap()); } if (event.checkForRenewAcknowledgements()) { - currentFetch.renew(event.acknowledgementsMap(), event.acquisitionLockTimeoutMs()); + currentFetch.renew(event.acknowledgementsMap()); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java index 83a43cb69ae..cd5203524ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java @@ -43,7 +43,6 @@ import java.util.Optional; public class ShareFetch<K, V> { private final Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches; private Optional<Integer> acquisitionLockTimeoutMs; - private Optional<Integer> acquisitionLockTimeoutMsRenewed; public static <K, V> ShareFetch<K, V> empty() { return new ShareFetch<>(new HashMap<>(), Optional.empty()); @@ -52,7 +51,6 @@ public class ShareFetch<K, V> { private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches, Optional<Integer> acquisitionLockTimeoutMs) { this.batches = batches; this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs; - this.acquisitionLockTimeoutMsRenewed = Optional.empty(); } /** @@ -144,10 +142,6 @@ public class ShareFetch<K, V> { for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : batches.entrySet()) { entry.getValue().takeRenewals(); } - // Any acquisition lock timeout updated by renewal is applied as the renewed records are move back to in-flight - if (acquisitionLockTimeoutMsRenewed.isPresent()) { - acquisitionLockTimeoutMs = acquisitionLockTimeoutMsRenewed; - } } /** @@ -241,18 +235,16 @@ public class ShareFetch<K, V> { * Handles completed renew acknowledgements by returning successfully renewed records * to the set of in-flight records. * - * @param acknowledgementsMap Map from topic-partition to acknowledgements for - * completed renew acknowledgements - * @param acquisitionLockTimeoutMs Optional updated acquisition lock timeout + * @param acknowledgementsMap Map from topic-partition to acknowledgements for + * completed renew acknowledgements * * @return The number of records renewed */ - public int renew(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, Optional<Integer> acquisitionLockTimeoutMs) { + public int renew(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) { int recordsRenewed = 0; for (Map.Entry<TopicIdPartition, Acknowledgements> entry : acknowledgementsMap.entrySet()) { recordsRenewed += batches.get(entry.getKey()).renew(entry.getValue()); } - acquisitionLockTimeoutMsRenewed = acquisitionLockTimeoutMs; return recordsRenewed; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java index ff7f7598f68..9dfc06b1f42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.common.TopicIdPartition; import java.util.Map; -import java.util.Optional; /** * This is the class of events created by the {@link ConsumerNetworkThread network thread} to indicate completion @@ -31,14 +30,11 @@ public class ShareAcknowledgementEvent { private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap; private final boolean checkForRenewAcknowledgements; - private final Optional<Integer> acquisitionLockTimeoutMs; public ShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, - boolean checkForRenewAcknowledgements, - Optional<Integer> acquisitionLockTimeoutMs) { + boolean checkForRenewAcknowledgements) { this.acknowledgementsMap = acknowledgementsMap; this.checkForRenewAcknowledgements = checkForRenewAcknowledgements; - this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs; } public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() { @@ -48,8 +44,4 @@ public class ShareAcknowledgementEvent { public boolean checkForRenewAcknowledgements() { return checkForRenewAcknowledgements; } - - public Optional<Integer> acquisitionLockTimeoutMs() { - return acquisitionLockTimeoutMs; - } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index dc05257bd6e..3064f5125c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -485,16 +485,16 @@ public class ShareConsumeRequestManagerTest { ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(null, Optional.empty()); // Passing null acknowledgements should mean we do not send the background event at all. - resultHandler.complete(tip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false, Optional.empty()); + resultHandler.complete(tip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false); assertEquals(0, completedAcknowledgements.size()); // Setting the request type to COMMIT_SYNC should still not send any background event // as we have initialized remainingResults to null. - resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty()); + resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false); assertEquals(0, completedAcknowledgements.size()); // Sending non-null acknowledgements means we do send the background event - resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false, Optional.empty()); + resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false); assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); } @@ -514,16 +514,16 @@ public class ShareConsumeRequestManagerTest { ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future)); // We only send the background event after all results have been completed. - resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty()); + resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false); assertEquals(0, completedAcknowledgements.size()); assertFalse(future.isDone()); - resultHandler.complete(t2ip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty()); + resultHandler.complete(t2ip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false); assertEquals(0, completedAcknowledgements.size()); assertFalse(future.isDone()); // After third response is received, we send the background event. - resultHandler.complete(tip1, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty()); + resultHandler.complete(tip1, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false); assertEquals(1, completedAcknowledgements.size()); assertEquals(2, completedAcknowledgements.get(0).size()); assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 6943c39f060..188a7066981 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -539,7 +539,7 @@ public class ShareConsumerImplTest { Acknowledgements acks = Acknowledgements.empty(); acks.add(0, AcknowledgeType.RENEW); acks.complete(null); - ShareAcknowledgementEvent e = new ShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + ShareAcknowledgementEvent e = new ShareAcknowledgementEvent(Map.of(tip, acks), true); acknowledgementEventQueue.add(e); records = consumer.poll(Duration.ofMillis(100)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java index e404d7b3e22..ebc85ed493b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java @@ -184,12 +184,11 @@ public class ShareFetchCollectorTest { Acknowledgements acks = acknowledgementsMap.get(topicAPartition0).acknowledgements(); acks.complete(null); - fetch.renew(Map.of(topicAPartition0, acks), Optional.of(20000)); + fetch.renew(Map.of(topicAPartition0, acks)); assertTrue(fetch.hasRenewals()); fetch.takeRenewedRecords(); assertFalse(fetch.hasRenewals()); assertEquals(DEFAULT_MAX_POLL_RECORDS, fetch.numRecords()); - assertEquals(Optional.of(20000), fetch.acquisitionLockTimeoutMs()); // Now attempt to collect more records from the fetch buffer. fetch = fetchCollector.collect(fetchBuffer); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6542831859a..b18f14794f1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -4014,9 +4014,6 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for processing a share acknowledge response, invoked before throttling def processShareAcknowledgeResponse(responseAcknowledgeData: Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], request: RequestChannel.Request): ShareAcknowledgeResponse = { - val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest] - val groupId = shareAcknowledgeRequest.data.groupId - val partitions = new util.LinkedHashMap[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] val nodeEndpoints = new mutable.HashMap[Int, Node] responseAcknowledgeData.foreach{ case(tp, partitionData) => @@ -4039,7 +4036,7 @@ class KafkaApis(val requestChannel: RequestChannel, 0, partitions, nodeEndpoints.values.toList.asJava, - ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager, groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs) + config.shareGroupConfig.shareGroupRecordLockDurationMs ) }
