jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162047588
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
completeShutdown()
return
+ case callback: RequestChannel.CallbackRequest =>
+ try {
+ val originalRequest = callback.originalRequest
+
+ // If we've already executed a callback for this request, reset
the times and subtract the callback time from the
+ // new dequeue time. This will allow calculation of multiple
callback times.
+ // Otherwise, set dequeue time to now.
+ if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
+ val prevCallbacksTimeNanos =
originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) -
originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
+ originalRequest.callbackRequestCompleteTimeNanos = None
+ originalRequest.callbackRequestDequeueTimeNanos =
Some(time.nanoseconds() - prevCallbacksTimeNanos)
+ } else {
+ originalRequest.callbackRequestDequeueTimeNanos =
Some(time.nanoseconds())
+ }
+
+ currentRequest.set(originalRequest)
+ callback.fun()
+ if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)
+ originalRequest.callbackRequestCompleteTimeNanos =
Some(time.nanoseconds())
+ } catch {
+ case e: FatalExitError =>
+ completeShutdown()
+ Exit.exit(e.statusCode)
+ case e: Throwable => error("Exception when handling request", e)
+ } finally {
+ currentRequest.remove()
+ }
+
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling
request $request")
+ currentRequest.set(request)
apis.handle(request, requestLocal)
} catch {
case e: FatalExitError =>
completeShutdown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
+ currentRequest.remove()
request.releaseBuffer()
}
+ case RequestChannel.WakeupRequest => // We should handle this in
receiveRequest by polling callbackQueue.
Review Comment:
We can add a warning log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]