hudeqi commented on code in PR #13696:
URL: https://github.com/apache/kafka/pull/13696#discussion_r1205574820


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -460,8 +460,16 @@ abstract class AbstractFetcherThread(name: String,
     partitionMapLock.lockInterruptibly()
     try {
       Option(partitionStates.stateValue(topicPartition)).foreach { state =>
-        val newState = PartitionFetchState(state.topicId, 
math.min(truncationOffset, state.fetchOffset),
-          state.lag, state.currentLeaderEpoch, state.delay, state = Truncating,
+        var lag = state.lag

Review Comment:
   > Isn't the lag recalculated on the next fetch iteration 
([here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L362))?
   
   Hi Hangleton, thanks for your review! As you said, under normal 
circumstances, the lag will be recalculated on the next fetch. However, if an 
exception occurs before updating the lag (for example, processPartitionData 
throws an exception due to a disk error) or the return value of 
logAppendInfoOpt is None, then the lag value will always be or last for a 
period of time as the previous error value (obviously truncate, lag display is 
still very large, which seems to deviate greatly from our understanding). 
Therefore, the modification here is a defensive measure that can effectively 
avoid such situations. @Hangleton 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to