Hi Jun, Thanks for your comments. 1. I just replied in the discussion thread about the positive change this KIP can still bring if implemented on the latest trunk, which includes the async ZK operations for KAFKA-5642. The evaluation is done using an integration test. In production, we have not upgraded to Kafka 1.1 yet, and the code we are currently running does not include async ZK operations, therefore I don't have any real usage result.
2. Thanks for bringing this up. I haven't considered this setting, and the existing proposal in this KIP would make data requests and controller requests share a memory poll of size specified by the config queued.max.request.bytes. The downside is that if there is memory pressure, controller requests may be blocked from being read from a socket and does not get prioritized at the socket layer. If we have a separate bytes limit for the controller requests, I imagine there would be a separate memory pool dedicated to controller requests. Also it requires the processors to tell connections from a controller apart from connections from other brokers or clients, which would probably require a dedicated port for the controller? IMO, this change is mainly driven by the memory pressure, kind of an orthogonal issue, and we can address it with a separate KIP if desired. Please let me know what you think. 3. I plans to change the implementation of the method receiveRequest(timeout: Long) in the RequestChannel class as follows: val controllerRequest = controllerRequestQueue.poll() if (controllerRequest != null) { controllerRequest } else { dataRequestQueue.poll(timeout, TimeUnit.MILLISECONDS) } with this implementation, there is no need to explicitly choose a request handler thread to wake up depending on the types of request enqueued, and if a controller request arrives while some request handler threads are blocked on an empty data request queue, they will simply timeout and call the receiveRequest method again. In terms of performance, it means that in the worst case, for a controller request that just missed the receiveRequest call, it can be delayed for as long as the timeout parameter, which is hard coded to be 300 milliseconds. If there is just one request handler thread, the average delay is 150 milliseconds assuming the chance of a controller request arriving at any particular time is the same. With N request handler threads, the average delay is 150/N milliseconds, which does not seem to be a problem. We have considered waking up of request handler threads based on which queue the request handler threads are blocked, and that design was turned down because of its complexity. The design can be found at here <https://cwiki.apache.org/confluence/display/KAFKA/Old+controller+request+queue+design> . If you mean a general purpose priority queue such as the java.util.PriorityQueue, we also have considered it and turned down the design because - The readily available class java.util.PriorityQueue is unbounded and we'll need to implement a bounded version - We would still like to have the FIFO semantics on both the controller request queue and data request queue, which conceptually does not fit very well with a general purpose priority queue, e.g. we would probably need to use the enqueue time to enforce FIFO semantics. - A typical operation on the priority queue is O(log n), whereas the sample implementation above gives O(1) performance regardless of the size of both queues. 4. For the two APIs sendRequest and receiveRequest, since we are only changing their implementation, not the API itself the two metrics will support two queues and the meaning of "Idle" still holds: *Before this KIPAfter this KIPNetworkProcessorAvgIdlePercentidle = blocked on selectnot idle includes being blocked on requestQueueidle = blocked on selectnot idle includes being blocked on either controller request queue or data request queueRequestHandlerAvgIdlePercentidle = blocked on reading from requestQueue idle = taking a request from the controller request queue, or blocked on reading from the data request queue* Regards, Lucas On Fri, Jun 29, 2018 at 11:22 AM, Jun Rao <j...@confluent.io> wrote: > Hi, Lucas, > > Thanks for the KIP. A few comments below. > > 1. As Eno mentioned in the discussion thread, I am wondering how much of > this is still an issue after KAFKA-5642. With that fix, the requests from > the controller to the brokers are batched in all the common cases. Have you > deployed Kafka 1.1? What's the request queue time and the request queue > size that you have observed in production? > > 2. For the request queue, currently we can also bound it by size > through queued.max.request.bytes. Should we consider the same for the > control queue? > > 3. Implementation wise, currently the request handler threads just block on > the request queue when the queue is empty. With two queues, it seems that > we need to wake up a request handler thread blocked on one queue, when > another queue gets a request? Have we considered just making the request > queue a priority queue? > > 4. Related to 3, currently we have 2 > metrics NetworkProcessorAvgIdlePercent and RequestHandlerAvgIdlePercent > that measure the utilization of the network and the request handler thread > pools. They are computed by measuring the amount of time waiting on the > request queue. Will these 2 metrics be extended to support 2 request > queues. > > Jun > > > On Mon, Jun 18, 2018 at 1:04 PM, Lucas Wang <lucasatu...@gmail.com> wrote: > > > Hi All, > > > > I've addressed a couple of comments in the discussion thread for KIP-291, > > and > > got no objections after making the changes. Therefore I would like to > start > > the voting thread. > > > > KIP: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-291% > > 3A+Have+separate+queues+for+control+requests+and+data+requests > > > > Thanks for your time! > > Lucas > > >