Jose Armando Garcia Sancio created KAFKA-14238:
--------------------------------------------------

             Summary: KRaft replicas can delete segments not included in a 
snapshot
                 Key: KAFKA-14238
                 URL: https://issues.apache.org/jira/browse/KAFKA-14238
             Project: Kafka
          Issue Type: Bug
          Components: core, kraft
            Reporter: Jose Armando Garcia Sancio
             Fix For: 3.3.0


We see this in the log
{code:java}
Deleting segment LogSegment(baseOffset=243864, size=9269150, 
lastModifiedTime=1662486784182, largestRecordTimestamp=Some(1662486784160)) due 
to retention time 604800000ms breach based on the largest record timestamp in 
the segment {code}
This then cause {{KafkaRaftClient}} to throw an exception when sending batches 
to the listener:
{code:java}
 java.lang.IllegalStateException: Snapshot expected since next offset of 
org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@195461949 is 
0, log start offset is 369668 and high-watermark is 547379
        at 
org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$4(KafkaRaftClient.java:312)
        at java.base/java.util.Optional.orElseThrow(Optional.java:403)
        at 
org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$5(KafkaRaftClient.java:311)
        at java.base/java.util.OptionalLong.ifPresent(OptionalLong.java:165)
        at 
org.apache.kafka.raft.KafkaRaftClient.updateListenersProgress(KafkaRaftClient.java:309){code}
The on disk state for the cluster metadata partition confirms this:
{code:java}
 ls __cluster_metadata-0/
00000000000000369668.index
00000000000000369668.log
00000000000000369668.timeindex
00000000000000503411.index
00000000000000503411.log
00000000000000503411.snapshot
00000000000000503411.timeindex
00000000000000548746.snapshot
leader-epoch-checkpoint
partition.metadata
quorum-state{code}
Noticed that there are no {{checkpoint}} files and the log doesn't have a 
segment at base offset 0.

This is happening because the {{LogConfig}} used for KRaft sets the retention 
policy to {{delete}} which causes the method {{deleteOldSegments}} to delete 
old segments even if there are no snaspshot for it. For KRaft, Kafka should 
only delete segment that breach the log start offset.

Log configuration for KRaft:
{code:java}
      val props = new Properties()
      props.put(LogConfig.MaxMessageBytesProp, 
config.maxBatchSizeInBytes.toString)
      props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
      props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
      props.put(LogConfig.FileDeleteDelayMsProp, 
Int.box(Defaults.FileDeleteDelayMs))
      LogConfig.validateValues(props)
      val defaultLogConfig = LogConfig(props){code}
Segment deletion code:
{code:java}
     def deleteOldSegments(): Int = {
      if (config.delete) {
        deleteLogStartOffsetBreachedSegments() +
          deleteRetentionSizeBreachedSegments() +
          deleteRetentionMsBreachedSegments()
      } else {
        deleteLogStartOffsetBreachedSegments()
      }
    }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to