This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a8f49999ccf KAFKA-19019: Add support for remote storage fetch for share groups (#19437) a8f49999ccf is described below commit a8f49999ccf6a33b986bf895884f1a5e9708fb47 Author: Abhinav Dixit <adi...@confluent.io> AuthorDate: Tue Apr 22 04:00:24 2025 +0530 KAFKA-19019: Add support for remote storage fetch for share groups (#19437) This PR adds the support for remote storage fetch for share groups. There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for a single topic partition in a fetch request. Since, the logic of share fetch requests is largely based on how consumer groups work, we are following similar logic in implementing remote storage fetch. However, this problem should be addressed as part of KAFKA-19133 which should help us perform fetch for multiple remote fetch topic partition in a single share fetch request. Reviewers: Jun Rao <jun...@gmail.com> --- .../java/kafka/server/share/DelayedShareFetch.java | 466 ++++++++++++++--- .../kafka/server/share/DelayedShareFetchTest.java | 573 ++++++++++++++++++++- 2 files changed, 977 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index d68ed06d307..5bd5f4ea6aa 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -22,6 +22,10 @@ import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; @@ -34,8 +38,12 @@ import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.share.fetch.ShareFetchPartitionData; import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.FetchPartitionData; +import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import com.yammer.metrics.core.Meter; @@ -44,10 +52,16 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; @@ -83,7 +97,9 @@ public class DelayedShareFetch extends DelayedOperation { // Tracks the start time to acquire any share partition for a fetch request. private long acquireStartTimeMs; private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired; - private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched; + private LinkedHashMap<TopicIdPartition, LogReadResult> localPartitionsAlreadyFetched; + private Optional<RemoteFetch> remoteFetchOpt; + private Optional<Exception> remoteStorageFetchException; /** * This function constructs an instance of delayed share fetch operation for completing share fetch @@ -110,10 +126,24 @@ public class DelayedShareFetch extends DelayedOperation { sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM), shareGroupMetrics, - time + time, + Optional.empty() ); } + /** + * This function constructs an instance of delayed share fetch operation for completing share fetch + * requests instantaneously or with delay. The direct usage of this constructor is only from tests. + * + * @param shareFetch The share fetch parameters of the share fetch request. + * @param replicaManager The replica manager instance used to read from log/complete the request. + * @param exceptionHandler The handler to complete share fetch requests with exception. + * @param sharePartitions The share partitions referenced in the share fetch request. + * @param partitionMaxBytesStrategy The strategy to identify the max bytes for topic partitions in the share fetch request. + * @param shareGroupMetrics The share group metrics to record the metrics. + * @param time The system time. + * @param remoteFetchOpt Optional containing an in-flight remote fetch object or an empty optional. + */ DelayedShareFetch( ShareFetch shareFetch, ReplicaManager replicaManager, @@ -121,19 +151,22 @@ public class DelayedShareFetch extends DelayedOperation { LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions, PartitionMaxBytesStrategy partitionMaxBytesStrategy, ShareGroupMetrics shareGroupMetrics, - Time time + Time time, + Optional<RemoteFetch> remoteFetchOpt ) { super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); this.shareFetch = shareFetch; this.replicaManager = replicaManager; this.partitionsAcquired = new LinkedHashMap<>(); - this.partitionsAlreadyFetched = new LinkedHashMap<>(); + this.localPartitionsAlreadyFetched = new LinkedHashMap<>(); this.exceptionHandler = exceptionHandler; this.sharePartitions = sharePartitions; this.partitionMaxBytesStrategy = partitionMaxBytesStrategy; this.shareGroupMetrics = shareGroupMetrics; this.time = time; this.acquireStartTimeMs = time.hiResClockMs(); + this.remoteFetchOpt = remoteFetchOpt; + this.remoteStorageFetchException = Optional.empty(); // Register metrics for DelayedShareFetch. KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics"); this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS); @@ -152,58 +185,68 @@ public class DelayedShareFetch extends DelayedOperation { @Override public void onComplete() { // We are utilizing lock so that onComplete doesn't do a dirty read for instance variables - - // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + // partitionsAcquired and localPartitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), partitionsAcquired.keySet()); try { - LinkedHashMap<TopicIdPartition, Long> topicPartitionData; - // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. - if (partitionsAcquired.isEmpty()) { - topicPartitionData = acquirablePartitions(); - // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks - // for the share partition, hence if no partitions are yet acquired by tryComplete, - // we record the metric here. Do not check if the request has successfully acquired any - // partitions now or not, as then the upper bound of request timeout shall be recorded - // for the metric. - updateAcquireElapsedTimeMetric(); - } else { - // tryComplete invoked forceComplete, so we can use the data from tryComplete. - topicPartitionData = partitionsAcquired; - } - - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); - shareFetch.maybeComplete(Map.of()); - return; + if (remoteStorageFetchException.isPresent()) { + completeErroneousRemoteShareFetchRequest(); + } else if (remoteFetchOpt.isPresent()) { + completeRemoteStorageShareFetchRequest(); } else { - // Update metric to record acquired to requested partitions. - double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + completeLocalLogShareFetchRequest(); } - log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); - - completeShareFetchRequest(topicPartitionData); } finally { lock.unlock(); } } - private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) { + private void completeLocalLogShareFetchRequest() { + LinkedHashMap<TopicIdPartition, Long> topicPartitionData; + // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. + if (partitionsAcquired.isEmpty()) { + topicPartitionData = acquirablePartitions(sharePartitions); + // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks + // for the share partition, hence if no partitions are yet acquired by tryComplete, + // we record the metric here. Do not check if the request has successfully acquired any + // partitions now or not, as then the upper bound of request timeout shall be recorded + // for the metric. + updateAcquireElapsedTimeMetric(); + } else { + // tryComplete invoked forceComplete, so we can use the data from tryComplete. + topicPartitionData = partitionsAcquired; + } + + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request with an empty response. + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); + shareFetch.maybeComplete(Map.of()); + return; + } else { + // Update metric to record acquired to requested partitions. + double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + } + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + + processAcquiredTopicPartitionsForLocalLogFetch(topicPartitionData); + } + + private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) { try { LinkedHashMap<TopicIdPartition, LogReadResult> responseData; - if (partitionsAlreadyFetched.isEmpty()) + if (localPartitionsAlreadyFetched.isEmpty()) responseData = readFromLog( topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size())); else // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting // updated in a different tryComplete thread. - responseData = combineLogReadResponse(topicPartitionData, partitionsAlreadyFetched); + responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); List<ShareFetchPartitionData> shareFetchPartitionDataList = new ArrayList<>(); responseData.forEach((topicIdPartition, logReadResult) -> @@ -225,15 +268,7 @@ public class DelayedShareFetch extends DelayedOperation { log.error("Error processing delayed share fetch request", e); handleFetchException(shareFetch, topicPartitionData.keySet(), e); } finally { - // Releasing the lock to move ahead with the next request in queue. - releasePartitionLocks(topicPartitionData.keySet()); - // If we have a fetch request completed for a topic-partition, we release the locks for that partition, - // then we should check if there is a pending share fetch request for the topic-partition and complete it. - // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if - // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addToActionQueue(() -> topicPartitionData.keySet().forEach(topicIdPartition -> - replicaManager.completeDelayedShareFetchRequest( - new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet()); } } @@ -242,8 +277,12 @@ public class DelayedShareFetch extends DelayedOperation { */ @Override public boolean tryComplete() { - LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions(); + // Check to see if the remote fetch is in flight. If there is an in flight remote fetch we want to resolve it first. + if (remoteFetchOpt.isPresent()) { + return maybeCompletePendingRemoteFetch(); + } + LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions(sharePartitions); try { if (!topicPartitionData.isEmpty()) { // Update the metric to record the time taken to acquire the locks for the share partitions. @@ -252,17 +291,17 @@ public class DelayedShareFetch extends DelayedOperation { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); + // Store the remote fetch info and the topic partition for which we need to perform remote fetch. + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); + + if (topicPartitionRemoteFetchInfoOpt.isPresent()) { + return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get()); + } maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) { partitionsAcquired = topicPartitionData; - partitionsAlreadyFetched = replicaManagerReadResponse; - boolean completedByMe = forceComplete(); - // If invocation of forceComplete is not successful, then that means the request is already completed - // hence release the acquired locks. - if (!completedByMe) { - releasePartitionLocks(partitionsAcquired.keySet()); - } - return completedByMe; + localPartitionsAlreadyFetched = replicaManagerReadResponse; + return forceCompleteRequest(); } else { log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, " + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), @@ -277,10 +316,18 @@ public class DelayedShareFetch extends DelayedOperation { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - releasePartitionLocks(topicPartitionData.keySet()); - partitionsAcquired.clear(); - partitionsAlreadyFetched.clear(); - return forceComplete(); + // In case we have a remote fetch exception, we have already released locks for partitions which have potential + // local log read. We do not release locks for partitions which have a remote storage read because we need to + // complete the share fetch request in onComplete and if we release the locks early here, some other DelayedShareFetch + // request might get the locks for those partitions without this one getting complete. + if (remoteStorageFetchException.isEmpty()) { + releasePartitionLocks(topicPartitionData.keySet()); + partitionsAcquired.clear(); + localPartitionsAlreadyFetched.clear(); + return forceCompleteRequest(); + } else { + return forceCompleteRequest(); + } } } @@ -288,11 +335,13 @@ public class DelayedShareFetch extends DelayedOperation { * Prepare fetch request structure for partitions in the share fetch request for which we can acquire records. */ // Visible for testing - LinkedHashMap<TopicIdPartition, Long> acquirablePartitions() { + LinkedHashMap<TopicIdPartition, Long> acquirablePartitions( + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitionsForAcquire + ) { // Initialize the topic partitions for which the fetch should be attempted. LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<>(); - sharePartitions.forEach((topicIdPartition, sharePartition) -> { + sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) -> { // Add the share partition to the list of partitions to be fetched only if we can // acquire the fetch lock on it. if (sharePartition.maybeAcquireFetchLock()) { @@ -529,8 +578,307 @@ public class DelayedShareFetch extends DelayedOperation { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + topicPartitionRemoteFetchInfoOpt = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult)); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return topicPartitionRemoteFetchInfoOpt; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) { + Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<>(); + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + nonRemoteFetchTopicPartitions.add(topicIdPartition); + } + }); + // Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add + // them to the delayed actions queue. + releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions); + processRemoteFetchOrException(topicPartitionRemoteFetchInfo); + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates remoteFetchOpt. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch information. + */ + private void processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get(); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (Exception e) { + // Throw the error if any in scheduling the remote fetch task. + remoteStorageFetchException = Optional.of(e); + throw e; + } + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); + } + + /** + * This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent(). + * The operation can be completed if: + * Case a: The partition is in an offline log directory on this broker + * Case b: This broker does not know the partition it tries to fetch + * Case c: This broker is no longer the leader of the partition it tries to fetch + * Case d: The remote storage read request completed (succeeded or failed) + * @return boolean representing whether the remote fetch is completed or not. + */ + private boolean maybeCompletePendingRemoteFetch() { + boolean canComplete = false; + + TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition(); + try { + replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + } catch (KafkaStorageException e) { // Case a + log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (UnknownTopicOrPartitionException e) { // Case b + log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (NotLeaderOrFollowerException e) { // Case c + log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } + + if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d + return forceCompleteRequest(); + } else + return false; + } + + /** + * This function completes a share fetch request for which we have identified erroneous remote storage fetch in tryComplete() + * It should only be called when we know that there is remote fetch in-flight/completed. + */ + private void completeErroneousRemoteShareFetchRequest() { + try { + handleFetchException(shareFetch, partitionsAcquired.keySet(), remoteStorageFetchException.get()); + } finally { + releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet()); + } + + } + + private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topicIdPartitions) { + if (topicIdPartitions.isEmpty()) { + return; + } + // Releasing the lock to move ahead with the next request in queue. + releasePartitionLocks(topicIdPartitions); + // If we have a fetch request completed for a topic-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the topic-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.completeDelayedShareFetchRequest( + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + } + + /** + * This function completes a share fetch request for which we have identified remoteFetch during tryComplete() + * Note - This function should only be called when we know that there is remote fetch. + */ + private void completeRemoteStorageShareFetchRequest() { + LinkedHashMap<TopicIdPartition, Long> acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); + try { + List<ShareFetchPartitionData> shareFetchPartitionData = new ArrayList<>(); + int readableBytes = 0; + if (remoteFetchOpt.get().remoteFetchResult().isDone()) { + RemoteFetch remoteFetch = remoteFetchOpt.get(); + RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get(); + if (remoteLogReadResult.error.isPresent()) { + Throwable error = remoteLogReadResult.error.get(); + // If there is any error for the remote fetch topic partition, we populate the error accordingly. + shareFetchPartitionData.add( + new ShareFetchPartitionData( + remoteFetch.topicIdPartition(), + partitionsAcquired.get(remoteFetch.topicIdPartition()), + ReplicaManager.createLogReadResult(error).toFetchPartitionData(false) + ) + ); + } else { + FetchDataInfo info = remoteLogReadResult.fetchDataInfo.get(); + TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition(); + LogReadResult logReadResult = remoteFetch.logReadResult(); + shareFetchPartitionData.add( + new ShareFetchPartitionData( + topicIdPartition, + partitionsAcquired.get(remoteFetch.topicIdPartition()), + new FetchPartitionData( + logReadResult.error(), + logReadResult.highWatermark(), + logReadResult.leaderLogStartOffset(), + info.records, + Optional.empty(), + logReadResult.lastStableOffset().isDefined() ? OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : OptionalLong.empty(), + info.abortedTransactions, + logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) logReadResult.preferredReadReplica().get()) : OptionalInt.empty(), + false + ) + ) + ); + readableBytes += info.records.sizeInBytes(); + } + } else { + cancelRemoteFetchTask(); + } + + // If remote fetch bytes < shareFetch.fetchParams().maxBytes, then we will try for a local read. + if (readableBytes < shareFetch.fetchParams().maxBytes) { + // Get the local log read based topic partitions. + LinkedHashMap<TopicIdPartition, SharePartition> nonRemoteFetchSharePartitions = new LinkedHashMap<>(); + sharePartitions.forEach((topicIdPartition, sharePartition) -> { + if (!partitionsAcquired.containsKey(topicIdPartition)) { + nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition); + } + }); + acquiredNonRemoteFetchTopicPartitionData = acquirablePartitions(nonRemoteFetchSharePartitions); + if (!acquiredNonRemoteFetchTopicPartitionData.isEmpty()) { + log.trace("Fetchable local share partitions for a remote share fetch request data: {} with groupId: {} fetch params: {}", + acquiredNonRemoteFetchTopicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + + LinkedHashMap<TopicIdPartition, LogReadResult> responseData = readFromLog( + acquiredNonRemoteFetchTopicPartitionData, + partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - readableBytes, acquiredNonRemoteFetchTopicPartitionData.keySet(), acquiredNonRemoteFetchTopicPartitionData.size())); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet()) { + if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) { + shareFetchPartitionData.add( + new ShareFetchPartitionData( + entry.getKey(), + acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey()), + entry.getValue().toFetchPartitionData(false) + ) + ); + } + } + } + } + + // Update metric to record acquired to requested partitions. + double acquiredRatio = (double) (partitionsAcquired.size() + acquiredNonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size(); + if (acquiredRatio > 0) + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (acquiredRatio * 100)); + + Map<TopicIdPartition, ShareFetchResponseData.PartitionData> remoteFetchResponse = ShareFetchUtils.processFetchResponse( + shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler); + shareFetch.maybeComplete(remoteFetchResponse); + log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse); + } catch (InterruptedException | ExecutionException e) { + log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e); + handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e); + } catch (Exception e) { + log.error("Unexpected error in processing delayed share fetch request", e); + handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e); + } finally { + Set<TopicIdPartition> topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); + topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet()); + releasePartitionLocksAndAddToActionQueue(topicIdPartitions); + } + } + + private void handleExceptionInCompletingRemoteStorageShareFetchRequest( + Set<TopicIdPartition> acquiredNonRemoteFetchTopicPartitions, + Exception e + ) { + Set<TopicIdPartition> topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); + topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitions); + handleFetchException(shareFetch, topicIdPartitions, e); + } + + /** + * Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is + * already running as it may force closing opened/cached resources as transaction index. + * Note - This function should only be called when we know that there is remote fetch. + */ + private void cancelRemoteFetchTask() { + boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false); + if (!cancelled) { + log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}", + remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone()); + } + } + + private boolean forceCompleteRequest() { + boolean completedByMe = forceComplete(); + // If the delayed operation is completed by me, the acquired locks are already released in onComplete(). + // Otherwise, we need to release the acquired locks. + if (!completedByMe) { + releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet()); + } + return completedByMe; + } + + public record RemoteFetch( + TopicIdPartition topicIdPartition, + LogReadResult logReadResult, + Future<Void> remoteFetchTask, + CompletableFuture<RemoteLogReadResult> remoteFetchResult, + RemoteStorageFetchInfo remoteFetchInfo + ) { + @Override + public String toString() { + return "RemoteFetch(" + + "topicIdPartition=" + topicIdPartition + + ", logReadResult=" + logReadResult + + ", remoteFetchTask=" + remoteFetchTask + + ", remoteFetchResult=" + remoteFetchResult + + ", remoteFetchInfo=" + remoteFetchInfo + + ")"; + } + } + + public record TopicPartitionRemoteFetchInfo( + TopicIdPartition topicIdPartition, + LogReadResult logReadResult + ) { + @Override + public String toString() { + return "TopicPartitionRemoteFetchInfo(" + + "topicIdPartition=" + topicIdPartition + + ", logReadResult=" + logReadResult + + ")"; + } + } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index bb8b51b40e2..43ece70ca0e 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -25,10 +25,15 @@ import kafka.server.ReplicaQuota; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.share.SharePartitionKey; @@ -46,6 +51,8 @@ import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; @@ -61,10 +68,15 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; +import scala.Option; import scala.Tuple2; +import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; @@ -73,6 +85,7 @@ import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDel import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -95,6 +108,8 @@ public class DelayedShareFetchTest { private static final FetchParams FETCH_PARAMS = new FetchParams( FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true); + private static final FetchDataInfo REMOTE_FETCH_INFO = new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), + MemoryRecords.EMPTY, false, Optional.empty(), Optional.of(mock(RemoteStorageFetchInfo.class))); private static final BrokerTopicStats BROKER_TOPIC_STATS = new BrokerTopicStats(); private Timer mockTimer; @@ -487,7 +502,7 @@ public class DelayedShareFetchTest { delayedShareFetch.forceComplete(); assertTrue(delayedShareFetch.isCompleted()); // Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch. - Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(sharePartitions); assertEquals(0, future.join().size()); assertTrue(delayedShareFetch.lock().tryLock()); delayedShareFetch.lock().unlock(); @@ -497,7 +512,7 @@ public class DelayedShareFetchTest { delayedShareFetch.forceComplete(); assertTrue(delayedShareFetch.isCompleted()); // Verifying that the second forceComplete does not call acquirablePartitions method in DelayedShareFetch. - Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(sharePartitions); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); // Assert both metrics shall be recorded only once. @@ -1155,6 +1170,525 @@ public class DelayedShareFetchTest { assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); } + @Test + public void testRemoteStorageFetchTryCompleteReturnsFalse() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp1 and remote storage read result for tp2. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for local log read topic partitions tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompleteThrowsException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for sp0 and sp1. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp0 and remote storage read result for tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Exception will be thrown during the creation of remoteFetch object. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenThrow(new RejectedExecutionException("Exception thrown")); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + // tryComplete returns true and goes to forceComplete once the exception occurs. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertFalse(future.isCompletedExceptionally()); + assertEquals(Set.of(tp1), future.join().keySet()); + // Exception occurred and was handled. + Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).onComplete(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock but the broker becomes unavailable. + Future<Void> remoteFetchTask = mock(Future.class); + doAnswer(invocation -> { + when(remoteFetchTask.isCancelled()).thenReturn(true); + return false; + }).when(remoteFetchTask).cancel(false); + + when(remoteFetchTask.cancel(false)).thenReturn(true); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(remoteFetchTask); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled()); + // Partition locks should be released for all 3 topic partitions + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response contained tp0 and tp1 (local fetch) but not tp2, since it errored out. + assertEquals(Set.of(tp0, tp1), future.join().keySet()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.empty(), + Optional.of(new TimeoutException("Error occurred while creating remote fetch result")) // Remote fetch result is returned with an error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + + SharePartition sp0 = mock(SharePartition.class); + + // sp0 is acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset does not match with the cached entry for sp0, sp1 and sp2. Hence, a replica manager fetch will happen for all of them in tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp0, tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // the future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0, tp1, tp2), future.join().keySet()); + // Verify the locks are released for both local log and remote storage read topic partitions tp0, tp1 and tp2. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 and sp1 are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for both. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + LinkedHashSet<TopicIdPartition> remoteStorageFetchPartitions = new LinkedHashSet<>(); + remoteStorageFetchPartitions.add(tp0); + remoteStorageFetchPartitions.add(tp1); + + // Mocking remote storage read result for tp0 and tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), remoteStorageFetchPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released separately for tp1 (from tryComplete). + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + // From onComplete, the locks will be released for both tp0 and tp1. tp0 because it was acquired from + // tryComplete and has remote fetch processed. tp1 will be reacquired in onComplete when we check for local log read. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response only contains the first remote storage fetch topic partition - tp0. + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), @@ -1182,6 +1716,37 @@ public class DelayedShareFetchTest { return partitionMaxBytesStrategy; } + private Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLocalAndRemoteFetchResult( + Set<TopicIdPartition> localLogReadTopicIdPartitions, + Set<TopicIdPartition> remoteReadTopicIdPartitions) { + List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>(); + localLogReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY), + Option.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + Option.empty(), + Option.empty(), + Option.empty() + )))); + remoteReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( + REMOTE_FETCH_INFO, + Option.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + Option.empty(), + Option.empty(), + Option.empty() + )))); + return CollectionConverters.asScala(logReadResults).toSeq(); + } + @SuppressWarnings("unchecked") private static BiConsumer<SharePartitionKey, Throwable> mockExceptionHandler() { return mock(BiConsumer.class); @@ -1194,6 +1759,7 @@ public class DelayedShareFetchTest { private LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = mock(LinkedHashMap.class); private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); private Time time = new MockTime(); + private final Optional<DelayedShareFetch.RemoteFetch> remoteFetch = Optional.empty(); private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class); DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) { @@ -1243,7 +1809,8 @@ public class DelayedShareFetchTest { sharePartitions, partitionMaxBytesStrategy, shareGroupMetrics, - time); + time, + remoteFetch); } } }