satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1192210616
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,25 +623,210 @@ public String toString() { } } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); - if (maybeLog.isPresent()) { - UnifiedLog log = maybeLog.get(); - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); + OptionalInt epoch = OptionalInt.empty(); + + if (logOptional.isPresent()) { + Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache(); + if (leaderEpochCache.isDefined()) { + epoch = leaderEpochCache.get().epochForOffset(offset); + } + } + + Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent() + ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) + : Optional.empty(); + + if (!rlsMetadataOptional.isPresent()) { + String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; + throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + + epochStr + " and partition " + tp + " which does not exist in remote tier."); + } + + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); + int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); + InputStream remoteSegInputStream = null; + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); + RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + + RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + + if (firstBatch == null) + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, + includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + + int firstBatchSize = firstBatch.sizeInBytes(); + // An empty record is sent instead of an incomplete batch when + // - there is no minimum-one-message constraint and + // - the first batch size is more than maximum bytes that can be sent. + // - for FetchRequest version 3 or above and + if (!remoteStorageFetchInfo.minOneMessage && + !remoteStorageFetchInfo.hardMaxBytesLimit && + firstBatchSize > maxBytes) { + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); + } + + int updatedFetchSize = + remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + + ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); + int remainingBytes = updatedFetchSize; + + firstBatch.writeTo(buffer); + remainingBytes -= firstBatchSize; + + if (remainingBytes > 0) { + // read the input stream until min of (EOF stream or buffer's remaining capacity). + Utils.readFully(remoteSegInputStream, buffer); + } + buffer.flip(); + + FetchDataInfo fetchDataInfo = new FetchDataInfo( + new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), + MemoryRecords.readableRecords(buffer)); + if (includeAbortedTxns) { + fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get()); + } + + return fetchDataInfo; + } finally { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); + } + } + + private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); + } + + private FetchDataInfo addAbortedTransactions(long startOffset, Review Comment: We have the below test scenarios in 2.8.x in Scala. We plan to raise these in a followup PR as it requires a rewrite in Java and refactor from EasyMock to Mockito. testAddAbortedTransactions testCollectAbortedTransactionsIteratesNextRemoteSegment testCollectAbortedTransactionsIteratesNextLocalSegment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org