divijvaidya commented on code in PR #14104:
URL: https://github.com/apache/kafka/pull/14104#discussion_r1275258903


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -558,6 +618,38 @@ void 
testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
         verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithCorruptedIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        task.convertToFollower();

Review Comment:
   why are we converting to follower here? Isn't it only the leader that 
performs upload? Can we add a assertion here that this task is running from 
leader.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -574,6 +575,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
                 }
             } catch (InterruptedException ex) {
                 throw ex;
+            } catch (CorruptIndexException ex) {

Review Comment:
   Correct me if I am wrong here but the recovery path looks as follows:
   
   1. RLM will continue to try uploading these segments.
   2. It will fail continuously (until the indexes are recovered)
   3. Tiered storage lag metric will increase (we have an open ticket to create 
that metric I think)
   4. Operator will fix the corrupted segments.
   
   
   Alternatively, can we have an automated recovery path here where we will 
re-create the indexes if they are corrupted before trying upload again. This 
automated recovery can be done as part of a separate PR.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -599,6 +602,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
             File logFile = segment.log().file();
             String logFileName = logFile.getName();
 
+            // Corrupted indexes should not be uploaded to remote storage
+            // Example case: Local storage was filled, what caused index 
corruption
+            // We should avoid uploading such segments
+            segment.timeIndex().sanityCheck();
+            segment.offsetIndex().sanityCheck();
+            segment.txnIndex().sanityCheck();
+
             logger.info("Copying {} to remote storage.", logFileName);

Review Comment:
   an info log prior to checking indexes would be helpful to understand where 
the code exited when one of the sanity check fails



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -558,6 +618,38 @@ void 
testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
         verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithCorruptedIndexes() 
throws Exception {

Review Comment:
   I might be missing something very obvious but where are we corrupting the 
indexes in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to