artemlivshits commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1163676408
########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int, completeShutdown() return + case callback: RequestChannel.CallbackRequest => + try { + val originalRequest = callback.originalRequest + + // If we've already executed a callback for this request, reset the times and subtract the callback time from the + // new dequeue time. This will allow calculation of multiple callback times. Review Comment: Right now, this framework just accounts for a single logical request that does a few non-blocking calls and continues serially after each call. We could probably add a check that prohibits adding a second .wrap during request / callback execution. ########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int, completeShutdown() return + case callback: RequestChannel.CallbackRequest => + try { + val originalRequest = callback.originalRequest + + // If we've already executed a callback for this request, reset the times and subtract the callback time from the + // new dequeue time. This will allow calculation of multiple callback times. + // Otherwise, set dequeue time to now. + if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) { + val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L) + originalRequest.callbackRequestCompleteTimeNanos = None + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos) + } else { + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds()) + } + + currentRequest.set(originalRequest) + callback.fun() + if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty) Review Comment: Basically, we should search for `apiLocalCompleteTimeNanos` and update the callbackRequestCompleteTimeNanos in similar places. My understanding is that we can send the reply during request / callback and it may complete (concurrently) before we end the request / callback or after we end request / callback. In which case we want the end time to be the earliest of the two. Usually, the way it's done is the start time is set at the start of the request / callback processing and the end time is set after, but if the end time is not set then the metric is reported, then we just report the current time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org