divijvaidya commented on code in PR #14104: URL: https://github.com/apache/kafka/pull/14104#discussion_r1275258903
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -558,6 +618,38 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong()); } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithCorruptedIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition); + task.convertToFollower(); Review Comment: why are we converting to follower here? Isn't it only the leader that performs upload? Can we add a assertion here that this task is running from leader. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -574,6 +575,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } catch (InterruptedException ex) { throw ex; + } catch (CorruptIndexException ex) { Review Comment: Correct me if I am wrong here but the recovery path looks as follows: 1. RLM will continue to try uploading these segments. 2. It will fail continuously (until the indexes are recovered) 3. Tiered storage lag metric will increase (we have an open ticket to create that metric I think) 4. Operator will fix the corrupted segments. Alternatively, can we have an automated recovery path here where we will re-create the indexes if they are corrupted before trying upload again. This automated recovery can be done as part of a separate PR. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -599,6 +602,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment File logFile = segment.log().file(); String logFileName = logFile.getName(); + // Corrupted indexes should not be uploaded to remote storage + // Example case: Local storage was filled, what caused index corruption + // We should avoid uploading such segments + segment.timeIndex().sanityCheck(); + segment.offsetIndex().sanityCheck(); + segment.txnIndex().sanityCheck(); + logger.info("Copying {} to remote storage.", logFileName); Review Comment: an info log prior to checking indexes would be helpful to understand where the code exited when one of the sanity check fails ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -558,6 +618,38 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong()); } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithCorruptedIndexes() throws Exception { Review Comment: I might be missing something very obvious but where are we corrupting the indexes in this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org