kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1314695377


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1066,50 +1066,53 @@ public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
         // Check for out of bound epochs between segment epochs and current 
leader epochs.
         Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
         Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+
+        // FIXME: We have to remove the below check too for `DELETE_RECORDS` 
API to work properly.
         if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
-            LOGGER.debug("[{}] Remote segment {} is not within the partition 
leader epoch lineage. Remote segment epochs: {} and partition leader epochs: 
{}",
-                    segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs);
+            LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+                            "Remote segment epochs: {} and partition leader 
epochs: {}",
+                    segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
             return false;
         }
 
         for (Map.Entry<Integer, Long> entry : segmentLeaderEpochs.entrySet()) {
             int epoch = entry.getKey();
-            long offset = entry.getValue();
+//            long offset = entry.getValue();
 
             // If segment's epoch does not exist in the leader epoch lineage 
then it is not a valid segment.
             if (!leaderEpochs.containsKey(epoch)) {
-                LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+                LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +
+                                "Remote segment epochs: {} and partition 
leader epochs: {}",
+                        segmentMetadata.remoteLogSegmentId(), epoch, 
segmentLeaderEpochs, leaderEpochs);
                 return false;
             }
 
             // Segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset.
-            if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
-                LOGGER.debug("[{}]  Remote segment {}'s first epoch {}'s 
offset is less than leader epoch's offset {}.",
-                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
-                return false;
-            }
+//            if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
+//                LOGGER.debug("Segment {} first epoch {} offset is less than 
leader epoch offset {}.",
+//                        segmentMetadata.remoteLogSegmentId(), epoch, 
leaderEpochs.get(epoch));
+//                return false;
+//            }
 
             // Segment's end offset should be less than or equal to the 
respective leader epoch's offset.
             if (epoch == segmentLastEpoch) {
                 Map.Entry<Integer, Long> nextEntry = 
leaderEpochs.higherEntry(epoch);
                 if (nextEntry != null && segmentEndOffset > 
nextEntry.getValue() - 1) {
-                    LOGGER.debug("[{}]  Remote segment {}'s end offset {} is 
more than leader epoch's offset {}.",
-                            segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 
1);
+                    LOGGER.debug("Segment {} end offset {} is more than leader 
epoch offset {}.",
+                            segmentMetadata.remoteLogSegmentId(), 
segmentEndOffset, nextEntry.getValue() - 1);
                     return false;
                 }
             }
 
             // Next segment epoch entry and next leader epoch entry should be 
same to ensure that the segment's epoch
             // is within the leader epoch lineage.
             if (epoch != segmentLastEpoch && 
!leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch)))
 {
-                LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+                LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +

Review Comment:
   The logger context already hold the partition information so removed the 
`partition` placeholder from the logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to