hudeqi commented on code in PR #13696: URL: https://github.com/apache/kafka/pull/13696#discussion_r1205574820
########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -460,8 +460,16 @@ abstract class AbstractFetcherThread(name: String, partitionMapLock.lockInterruptibly() try { Option(partitionStates.stateValue(topicPartition)).foreach { state => - val newState = PartitionFetchState(state.topicId, math.min(truncationOffset, state.fetchOffset), - state.lag, state.currentLeaderEpoch, state.delay, state = Truncating, + var lag = state.lag Review Comment: > Isn't the lag recalculated on the next fetch iteration ([here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L362))? Hi Hangleton, thanks for your review! As you said, under normal circumstances, the lag will be recalculated on the next fetch. However, if an exception occurs before updating the lag (for example, processPartitionData throws an exception due to a disk error) or the return value of logAppendInfoOpt is None, then the lag value will always be or last for a period of time as the previous error value (obviously truncate, lag display is still very large, which seems to deviate greatly from our understanding). Therefore, the modification here is a defensive measure that can effectively avoid such situations. @Hangleton -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org