satishd commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r985706396
##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -169,4 +181,140 @@ class ReplicaFetcherThread(name: String,
partition.truncateFullyAndStartAt(offset, isFuture = false)
}
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader.
+ */
+ override protected def buildRemoteLogAuxState(partition: TopicPartition,
+ currentLeaderEpoch: Int,
+ leaderLocalLogStartOffset:
Long,
+
epochForLeaderLocalLogStartOffset: Int,
+ leaderLogStartOffset: Long):
Long = {
+
+ def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+ val previousEpoch = epoch - 1
+ // Find the end-offset for the epoch earlier to the given epoch from
the leader
+ val partitionsWithEpochs = Map(partition -> new
EpochData().setPartition(partition.partition())
+ .setCurrentLeaderEpoch(currentLeaderEpoch)
+ .setLeaderEpoch(previousEpoch))
+ val maybeEpochEndOffset =
leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+ if (maybeEpochEndOffset.isEmpty) {
+ throw new KafkaException("No response received for partition: " +
partition);
+ }
+
+ val epochEndOffset = maybeEpochEndOffset.get
+ if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+ throw Errors.forCode(epochEndOffset.errorCode()).exception()
+ }
+
+ epochEndOffset
+ }
+
+ val log = replicaMgr.localLogOrException(partition)
+ val nextOffset = {
+ if (log.remoteStorageSystemEnable &&
log.config.remoteLogConfig.remoteStorageEnable) {
+ if (replicaMgr.remoteLogManager.isEmpty) throw new
IllegalStateException("RemoteLogManager is not yet instantiated")
+
+ val rlm = replicaMgr.remoteLogManager.get
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset -
1). We need to build the leader epoch cache
+ // until that offset
+ val previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1
+ val targetEpoch: Int = {
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ epochForLeaderLocalLogStartOffset
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+ val earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+ // Check if the target offset lies with in the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will handle any
gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For leaderLocalLogStartOffset
as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1,
end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1 like
below.
+ earlierEpochEndOffset.leaderEpoch()
+ } else epochForLeaderLocalLogStartOffset
+ }
+ }
+
+ val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition,
targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+
+ if (maybeRlsm.isPresent) {
+ val remoteLogSegmentMetadata = maybeRlsm.get()
+ // Build leader epoch cache, producer snapshots until
remoteLogSegmentMetadata.endOffset() and start
+ // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+ val nextOffset = remoteLogSegmentMetadata.endOffset() + 1
Review Comment:
This offset is not `last-tiered-offset + 1` but
`previousOffsetToLeaderLocalLogStartOffset` which is `leaderLocalLogStartOffset
- 1`. This is aligned with what we mentioned in KIP-405.
You can take a look at the usage of `maybeRlsm` which is
```
val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]