kamalcph commented on code in PR #14330: URL: https://github.com/apache/kafka/pull/14330#discussion_r1314695377
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1066,50 +1066,53 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata // Check for out of bound epochs between segment epochs and current leader epochs. Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); + + // FIXME: We have to remove the below check too for `DELETE_RECORDS` API to work properly. if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { - LOGGER.debug("[{}] Remote segment {} is not within the partition leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", - segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); + LOGGER.debug("Segment {} is not within the partition leader epoch lineage. " + + "Remote segment epochs: {} and partition leader epochs: {}", + segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); return false; } for (Map.Entry<Integer, Long> entry : segmentLeaderEpochs.entrySet()) { int epoch = entry.getKey(); - long offset = entry.getValue(); +// long offset = entry.getValue(); // If segment's epoch does not exist in the leader epoch lineage then it is not a valid segment. if (!leaderEpochs.containsKey(epoch)) { - LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", - segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); + LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " + + "Remote segment epochs: {} and partition leader epochs: {}", + segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); return false; } // Segment's first epoch's offset should be more than or equal to the respective leader epoch's offset. - if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) { - LOGGER.debug("[{}] Remote segment {}'s first epoch {}'s offset is less than leader epoch's offset {}.", - segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)); - return false; - } +// if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) { +// LOGGER.debug("Segment {} first epoch {} offset is less than leader epoch offset {}.", +// segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)); +// return false; +// } // Segment's end offset should be less than or equal to the respective leader epoch's offset. if (epoch == segmentLastEpoch) { Map.Entry<Integer, Long> nextEntry = leaderEpochs.higherEntry(epoch); if (nextEntry != null && segmentEndOffset > nextEntry.getValue() - 1) { - LOGGER.debug("[{}] Remote segment {}'s end offset {} is more than leader epoch's offset {}.", - segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1); + LOGGER.debug("Segment {} end offset {} is more than leader epoch offset {}.", + segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1); return false; } } // Next segment epoch entry and next leader epoch entry should be same to ensure that the segment's epoch // is within the leader epoch lineage. if (epoch != segmentLastEpoch && !leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch))) { - LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", - segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); + LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " + Review Comment: The logger context already hold the partition information so removed the `partition` placeholder from the logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org