Hi, Radai, Sorry for the late response. How should the benchmark results be interpreted? The higher the ops/s, the better? It would also be useful to test this out on LinkedIn's traffic with enough socket connections to see if there is any performance degradation.
Also, there is a separate proposal KIP-81 to bound the consumer memory usage. Perhaps you can chime it there on whether this proposal can be utilized there too. https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer Thanks, Jun On Tue, Sep 27, 2016 at 10:23 AM, radai <radai.rosenbl...@gmail.com> wrote: > Hi Jun, > > 10 - mute/unmute functionality has been added in > https://github.com/radai-rosenblatt/kafka/tree/broker- > memory-pool-with-muting. > I have yet to run stress tests to see how it behaves versus without muting > > 11 - I've added a SimplePool implementation (nothing more than an > AtomicLong really) and compared it with my GC pool (that uses weak refs) - > https://github.com/radai-rosenblatt/kafka-benchmarks/ > tree/master/memorypool-benchmarks. > the results show no noticeable difference. what the results _do_ show > though is that for large requests (1M) performance drops very sharply. > since the SimplePool is essentially identical to current kafka code > behaviour (the nechmark never reaches out of memory conditions) it would > suggest to me that kafka performance for large request suffers greatly from > the cost of allocating (and releasing) large buffers (instead of actually > pooling them for later re-use). this means that an implementation of memory > pool that actually pools ( :-) ) is very likely to improve broker > performance for large requests. > > 12 - if there was a single thread iterating over selection keys then > stopping at 1st unsatisfiable request might work (if iteration order over > selection keys is deterministic, which is OS-dependent). however, kafka > spawns multiple selectors sharing the same pool so i doubt the approach > would gain anything. also notice that the current code already shuffles the > selection keys if memory is low (<10%) to try and guarantee fairness. > > attached the benchmark results for the pool implementations: > > Benchmark Mode Cnt > Score Error Units > GarbageCollectedMemoryPoolBenchmark.alloc_100k thrpt 5 > 198272.519 ± 16045.965 ops/s > GarbageCollectedMemoryPoolBenchmark.alloc_10k thrpt 5 > 2781439.307 ± 185287.072 ops/s > GarbageCollectedMemoryPoolBenchmark.alloc_1k thrpt 5 > 6029199.952 ± 465936.118 ops/s > GarbageCollectedMemoryPoolBenchmark.alloc_1m thrpt 5 > 18464.272 ± 332.861 ops/s > SimpleMemoryPoolBenchmark.alloc_100k thrpt 5 > 204240.066 ± 2207.619 ops/s > SimpleMemoryPoolBenchmark.alloc_10k thrpt 5 > 3000794.525 ± 83510.836 ops/s > SimpleMemoryPoolBenchmark.alloc_1k thrpt 5 > 5893671.778 ± 274239.541 ops/s > SimpleMemoryPoolBenchmark.alloc_1m thrpt 5 > 18728.085 ± 792.563 ops/s > > > > On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao <j...@confluent.io> wrote: > > > Hi, Radi, > > > > For 10, yes, we don't want the buffer pool to wake up the selector every > > time some memory is freed up. We only want to do that when there is > pending > > requests to the buffer pool not honored due to not enough memory. > > > > For 11, we probably want to be a bit careful with Weak References. In > > https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an > > implementation based on Weak Reference, but abandoned it due to too much > GC > > overhead. It probably also makes the code a bit harder to understand. So, > > perhaps it would be better if we can avoid it. > > > > For 12, that's a good point. I thought the selector will do some > shuffling > > for fairness. Perhaps we should stop allocating from the buffer pool when > > we see the first key whose memory can't be honored? > > > > Thanks, > > > > Jun > > > > > > On Sat, Sep 24, 2016 at 8:44 AM, radai <radai.rosenbl...@gmail.com> > wrote: > > > > > Hi Jun, > > > > > > 10 - I'll add this functionality to the mute/unmute branch. as every > > > mute/unmute operation is O(#connections / #selectorThreads) maybe a > > > watermark approach is better than waking when _any_ mem is available? > > > > > > 11 - "gc notifications" are done by using a ReferenceQueue ( > > > https://docs.oracle.com/javase/8/docs/api/java/lang/ > > > ref/ReferenceQueue.html) > > > in combination with weak references to allocated buffers. when a buffer > > is > > > reclaimed by the GC the corresponding weak ref to it is enqueued. the > > pool > > > maintains a set of outstanding buffer IDs (every allocated buffer gets > a > > > unique id - basically a sequence). a buffer explicitly returned has its > > id > > > removed from the tracking set and the weak reference to it destroyed, > so > > > its reference will never be enqueued by the GC even if it is GC'ed > later. > > > an enqueued reference (which indicates a buffer not returned to pool) > > also > > > carries the buffer id, which is then removed from the outstanding > buffers > > > set and the memory marked as available (and a warning printed). the > pool > > > has a background thread dedicated to reading references out of the > queue > > > (which under normal conditions remains blocked forever). > > > > > > 12 - the issue here is that a single "large" request (say 1MB) can get > > > blocked indefinitely under a high pressure of much smaller requests > (say > > > 1KB) keeping memory utilization close to 100%. if we dont care about > > > potential starvation the change is in a single condition. i'll make > this > > > configurable. > > > > > > 13 - I'll change the parameter name. > > > > > > On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Radai, > > > > > > > > Thanks for the updated KIP. A few more questions/comments below. > > > > > > > > 10. For "the mute/unmute happens just before poll(), which means as a > > > worst > > > > case there will be no reads for 300ms if memory was unavailable", I > am > > > > thinking that memory-pool could track if there is any pending request > > and > > > > wake up the selector when memory is released and there is a pending > > > > request. This way, poll() doesn't have to wait for the timeout if > > memory > > > > frees up early. > > > > > > > > 11. For "to facilitate faster implementation (as a safety net) the > pool > > > > will be implemented in such a way that memory that was not > release()ed > > > (but > > > > still garbage collected) would be detected and "reclaimed". this is > to > > > > prevent "leaks" in case of code paths that fail to release() > > properly.", > > > > could you explain a bit at the high level how this is done? > > > > > > > > 12. For "As the pool would allow any size request if it has any > > capacity > > > > available, the actual memory bound is queued.max.bytes + > > > > socket.request.max.bytes.", it seems intuitively, the pool should > only > > > give > > > > the Buffer back if it has enough available bytes. Then the request > > memory > > > > can be bounded by queued.max.bytes. We can validate that > > queued.max.bytes > > > > is at least socket.request.max.bytes. > > > > > > > > 13. For the naming, it seems request.queue.max.bytes is clearer than > > > > queue.max.bytes. > > > > > > > > Jun > > > > > > > > > > > > > > > > On Thu, Sep 22, 2016 at 10:53 AM, radai <radai.rosenbl...@gmail.com> > > > > wrote: > > > > > > > > > As discussed in the KIP call, I have updated the kip-72 page ( > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+ > Incoming+requests) > > > > > to record both configuration validations and implementation > concerns. > > > > > I've also implemented channel muting/unmuting in response to memory > > > > > pressure. its available as a separate branch here - > > > > > https://github.com/radai-rosenblatt/kafka/tree/broker- > > > > > memory-pool-with-muting > > > > > . the implementation without muting is here - > > > > > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool. > > > > > > > > > > the mute/unmute happens just before poll(), which means as a worst > > case > > > > > there will be no reads for 300ms if memory was unavailable (thats > the > > > > > timeout untill the next poll). perhaps a design with dedicated read > > > > threads > > > > > could do better (such a thread could actually block waiting for > > > memory), > > > > > but that would be a giant change. > > > > > > > > > > On Tue, Sep 13, 2016 at 9:20 AM, radai <radai.rosenbl...@gmail.com > > > > > > wrote: > > > > > > > > > > > the specific memory pool implementation i wrote will allocate > _any_ > > > > > amount > > > > > > you request if it has _any_ memory available (so if it has 1 byte > > > > > available > > > > > > and you ask for 1MB you will get 1MB and the counter will go > > > negative). > > > > > > this was done to avoid issues with starvation of large requests. > > > other > > > > > > implementations may be more strict. to me this means that > generally > > > its > > > > > not > > > > > > a simple "have memory" vs "no memory" split (which gets worse > > under a > > > > > > hypothetical tiered pool scheme for QoS). > > > > > > > > > > > > to allow this flexibility in pool implementation i must preserve > > the > > > > > > amount of memory required. once read from the channel i cant put > it > > > > back, > > > > > > so i store it? > > > > > > > > > > > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram < > > > > > > rajinisiva...@googlemail.com> wrote: > > > > > > > > > > > >> Is there any value in allowing the 4-byte size to be read even > > when > > > > the > > > > > >> request memory limit has been reached? If not, you can disable > > > OP_READ > > > > > >> interest for all channels that are ready inside Selector.poll() > > when > > > > > >> memory > > > > > >> limit has been reached and re-enable before returning from > poll(). > > > > > Perhaps > > > > > >> a listener that is invoked when MemoryPool moves from > unavailable > > to > > > > > >> available state can wakeup the selector. The changes for this > > should > > > > be > > > > > >> fairly contained without any additional channel state. And it > > would > > > > > avoid > > > > > >> the overhead of polls that return immediately even when progress > > > > cannot > > > > > be > > > > > >> made because memory limit has been reached. > > > > > >> > > > > > >> On Tue, Sep 13, 2016 at 12:31 AM, radai < > > radai.rosenbl...@gmail.com > > > > > > > > > >> wrote: > > > > > >> > > > > > >> > Hi Jun, > > > > > >> > > > > > > >> > Yes, youre right - right now the next select() call will > return > > > > > >> immediately > > > > > >> > with the same set of keys as earlier (at least) as they were > not > > > > > >> previously > > > > > >> > handled (no memory). > > > > > >> > My assumption is that this happens under considerable load - > > > > something > > > > > >> has > > > > > >> > to be occupying all this memory. also, this happens in the > > context > > > > of > > > > > >> > SocketServer.Processor.run(): > > > > > >> > > > > > > >> > while (isRunning) { > > > > > >> > configureNewConnections() > > > > > >> > processNewResponses() > > > > > >> > poll() <------ HERE > > > > > >> > processCompletedReceives() > > > > > >> > processCompletedSends() > > > > > >> > processDisconnected() > > > > > >> > } > > > > > >> > > > > > > >> > even within poll(), things like finishConnection(), prepare(), > > and > > > > > >> write()s > > > > > >> > can still make progress under low memory conditions. and given > > the > > > > > load, > > > > > >> > there's probably progress to be made in > > > processCompletedReceives(), > > > > > >> > processCompletedSends() and processDisconnected(). > > > > > >> > > > > > > >> > if there's progress to be made in other things its likely that > > the > > > > > next > > > > > >> > call to poll() will not happen immediately and so the loop > wont > > be > > > > > that > > > > > >> > tight. in order for this to devolve into true busy waiting you > > > would > > > > > >> need a > > > > > >> > situation where no progress can be made on any in-progress > > > requests > > > > > and > > > > > >> no > > > > > >> > responses to send out ? > > > > > >> > > > > > > >> > if my assumption does not hold then you are correct, and > > > > > >> selector.poll(300) > > > > > >> > currently hardcoded in SocketServer.Processor.poll() would > need > > to > > > > be > > > > > >> > replaced with something more complicated. my biggest point of > > > > concern > > > > > >> > though is that the resulting code would be complicated and > would > > > > > couple > > > > > >> > Selector to the memory pool very tightly. undey my current > patch > > > > > >> Selector > > > > > >> > needs the memory pool only to pass to channels when they are > > > built. > > > > > this > > > > > >> > would allow different memory pools relatively easily for > things > > > like > > > > > >> > reserving memory for cross-broker replication and high-SLA > > > > > connections. > > > > > >> a > > > > > >> > tighter coupling would make any such future modification hard. > > > > > >> > > > > > > >> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao <j...@confluent.io> > > > wrote: > > > > > >> > > > > > > >> > > Hi, Radai, > > > > > >> > > > > > > > >> > > Thanks for the reply. I still have a followup question on > #2. > > > > > >> > > > > > > > >> > > My understanding is that in your proposal, selector will now > > > first > > > > > >> read > > > > > >> > the > > > > > >> > > size of the Receive. If there is not enough memory, it has > to > > > turn > > > > > off > > > > > >> > the > > > > > >> > > READ interest bit for the corresponding KafkaChannel. > > Otherwise, > > > > > >> > subsequent > > > > > >> > > selector.poll() call will always return immediately, adding > > > > > >> unnecessary > > > > > >> > > overhead. If you do that, the Selector will need to know > when > > > to > > > > > >> turn on > > > > > >> > > the READ interest bit again. It may not be enough to do this > > > check > > > > > >> until > > > > > >> > > the next poll call since the timeout used by poll() could be > > > > > >> arbitrarily > > > > > >> > > large. So, it seems that some kind of coordination between > the > > > > > >> Selector > > > > > >> > and > > > > > >> > > the bufferpool is needed? > > > > > >> > > > > > > > >> > > Jun > > > > > >> > > > > > > > >> > > On Thu, Sep 8, 2016 at 7:02 PM, radai < > > > radai.rosenbl...@gmail.com > > > > > > > > > > >> > wrote: > > > > > >> > > > > > > > >> > > > Hi Jun, > > > > > >> > > > > > > > > >> > > > 1. yes, it is my own personal opinion that people use > > > > > >> > queued.max.requests > > > > > >> > > > as an indirect way to bound memory consumption. once a > more > > > > direct > > > > > >> > memory > > > > > >> > > > bound mechanism exists (and works) i dont think > > > > > queued.max.requests > > > > > >> > woul > > > > > >> > > > dbe required. having said that I was not planning on > making > > > any > > > > > >> changes > > > > > >> > > > w.r.t queued.max.requests support (so I was aiming to get > > to a > > > > > >> > situation > > > > > >> > > > where both configs are supported) to allow gathering > enough > > > > > >> > > data/feedback. > > > > > >> > > > > > > > > >> > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe > > > get a > > > > > >> > > > NetworkReceive. multiple such read() calls may be required > > > > until a > > > > > >> > > Receive > > > > > >> > > > is produced already in the current code base. my pool > > > > > >> implementation is > > > > > >> > > > non-blocking so if there's no memory available the read() > > call > > > > > will > > > > > >> > > return > > > > > >> > > > null. poll() would then move on to try and service other > > > > selection > > > > > >> > keys. > > > > > >> > > > the pool will be checked for available memory again the > next > > > > time > > > > > >> the > > > > > >> > > > SocketServer.run() loop gets to poll(). and so right now I > > > dont > > > > > >> > > communicate > > > > > >> > > > memory becoming available to the selector - it will just > go > > on > > > > to > > > > > >> try > > > > > >> > and > > > > > >> > > > make progress elsewhere and come back again. i never block > > it > > > or > > > > > >> send > > > > > >> > it > > > > > >> > > to > > > > > >> > > > sleep. I think for efficiency what could maybe be done is > if > > > > > there's > > > > > >> > not > > > > > >> > > > enough memory to service a readable selection key we may > > want > > > to > > > > > >> skip > > > > > >> > all > > > > > >> > > > other read-ready selection keys for that iteration of > > > > > >> > > pollSelectionKeys(). > > > > > >> > > > that would require rather invasive changes around > > > > > >> > > > Selector.pollSelectionKeys() that I'd rather avoid. also > > > > different > > > > > >> > > > KafkaChannels may be backed by different memory pool > (under > > > some > > > > > >> sort > > > > > >> > of > > > > > >> > > > future QoS scheme?), which would complicate such an > > > optimization > > > > > >> > further. > > > > > >> > > > > > > > > >> > > > 3. i added the pool interface and implementation under > > > > > >> > > kafka.common.memory, > > > > > >> > > > and the API is "thin" enough to be generally useful > > (currently > > > > its > > > > > >> > > > non-blocking only, but a get(long maxWait) is definitely > > > > doable). > > > > > >> > having > > > > > >> > > > said that, I'm not really familiar enough with the code to > > > > say.... > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao <j...@confluent.io > > > > > > wrote: > > > > > >> > > > > > > > > >> > > > > Hi, Radi, > > > > > >> > > > > > > > > > >> > > > > Thanks for the update. At the high level, this looks > > > > promising. > > > > > A > > > > > >> few > > > > > >> > > > > comments below. > > > > > >> > > > > > > > > > >> > > > > 1. If we can bound the requests by bytes, it seems that > we > > > > don't > > > > > >> need > > > > > >> > > > > queued.max.requests > > > > > >> > > > > any more? Could we just deprecate the config and make > the > > > > queue > > > > > >> size > > > > > >> > > > > unbounded? > > > > > >> > > > > 2. How do we communicate back to the selector when some > > > memory > > > > > is > > > > > >> > freed > > > > > >> > > > up? > > > > > >> > > > > We probably need to wake up the selector. For > efficiency, > > > > > perhaps > > > > > >> we > > > > > >> > > only > > > > > >> > > > > need to wake up the selector if the bufferpool is full? > > > > > >> > > > > 3. We talked about bounding the consumer's memory > before. > > To > > > > > fully > > > > > >> > > > support > > > > > >> > > > > that, we will need to bound the memory used by different > > > fetch > > > > > >> > > responses > > > > > >> > > > in > > > > > >> > > > > the consumer. Do you think the changes that you propose > > here > > > > can > > > > > >> be > > > > > >> > > > > leveraged to bound the memory in the consumer as well? > > > > > >> > > > > > > > > > >> > > > > Jun > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai < > > > > > >> radai.rosenbl...@gmail.com> > > > > > >> > > > > wrote: > > > > > >> > > > > > > > > > >> > > > > > My apologies for the delay in response. > > > > > >> > > > > > > > > > > >> > > > > > I agree with the concerns about OOM reading from the > > > actual > > > > > >> sockets > > > > > >> > > and > > > > > >> > > > > > blocking the network threads - messing with the > request > > > > queue > > > > > >> > itself > > > > > >> > > > > would > > > > > >> > > > > > not do. > > > > > >> > > > > > > > > > > >> > > > > > I propose instead a memory pool approach - the broker > > > would > > > > > >> have a > > > > > >> > > non > > > > > >> > > > > > blocking memory pool. upon reading the first 4 bytes > out > > > of > > > > a > > > > > >> > socket > > > > > >> > > an > > > > > >> > > > > > attempt would be made to acquire enough memory and if > > that > > > > > >> attempt > > > > > >> > > > fails > > > > > >> > > > > > the processing thread will move on to try and make > > > progress > > > > > with > > > > > >> > > other > > > > > >> > > > > > tasks. > > > > > >> > > > > > > > > > > >> > > > > > I think Its simpler than mute/unmute because using > > > > mute/unmute > > > > > >> > would > > > > > >> > > > > > require differentiating between sockets muted due to a > > > > request > > > > > >> in > > > > > >> > > > > progress > > > > > >> > > > > > (normal current operation) and sockets muted due to > lack > > > of > > > > > >> memory. > > > > > >> > > > > sockets > > > > > >> > > > > > of the 1st kind would be unmuted at the end of request > > > > > >> processing > > > > > >> > (as > > > > > >> > > > it > > > > > >> > > > > > happens right now) but the 2nd kind would require some > > > sort > > > > of > > > > > >> > > "unmute > > > > > >> > > > > > watchdog" which is (i claim) more complicated than a > > > memory > > > > > >> pool. > > > > > >> > > also > > > > > >> > > > a > > > > > >> > > > > > memory pool is a more generic solution. > > > > > >> > > > > > > > > > > >> > > > > > I've updated the KIP page ( > > > > > >> > > > > > https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > > > > > >> > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+ > > > > > >> > Incoming+requests) > > > > > >> > > > > > to reflect the new proposed implementation, and i've > > also > > > > put > > > > > >> up an > > > > > >> > > > > inital > > > > > >> > > > > > implementation proposal on github - > > > > > >> > > > > > https://github.com/radai-rosenblatt/kafka/commits/ > > > > > >> > broker-memory-pool > > > > > >> > > . > > > > > >> > > > > the > > > > > >> > > > > > proposed code is not complete and tested yet (so > > probably > > > > > buggy) > > > > > >> > but > > > > > >> > > > does > > > > > >> > > > > > include the main points of modification. > > > > > >> > > > > > > > > > > >> > > > > > the specific implementation of the pool on that branch > > > also > > > > > has > > > > > >> a > > > > > >> > > built > > > > > >> > > > > in > > > > > >> > > > > > safety net where memory that is acquired but not > > released > > > > > (which > > > > > >> > is a > > > > > >> > > > > bug) > > > > > >> > > > > > is discovered when the garbage collector frees it and > > the > > > > > >> capacity > > > > > >> > is > > > > > >> > > > > > reclaimed. > > > > > >> > > > > > > > > > > >> > > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao < > > j...@confluent.io > > > > > > > > > >> wrote: > > > > > >> > > > > > > > > > > >> > > > > > > Radi, > > > > > >> > > > > > > > > > > > >> > > > > > > Yes, I got the benefit of bounding the request queue > > by > > > > > >> bytes. My > > > > > >> > > > > concern > > > > > >> > > > > > > is the following if we don't change the behavior of > > > > > processor > > > > > >> > > > blocking > > > > > >> > > > > on > > > > > >> > > > > > > queue full. > > > > > >> > > > > > > > > > > > >> > > > > > > If the broker truly doesn't have enough memory for > > > > buffering > > > > > >> > > > > outstanding > > > > > >> > > > > > > requests from all connections, we have to either hit > > OOM > > > > or > > > > > >> block > > > > > >> > > the > > > > > >> > > > > > > processor. Both will be bad. I am not sure if one is > > > > clearly > > > > > >> > better > > > > > >> > > > > than > > > > > >> > > > > > > the other. In this case, the solution is probably to > > > > expand > > > > > >> the > > > > > >> > > > cluster > > > > > >> > > > > > to > > > > > >> > > > > > > reduce the per broker request load. > > > > > >> > > > > > > > > > > > >> > > > > > > If the broker actually has enough memory, we want to > > be > > > > able > > > > > >> to > > > > > >> > > > > configure > > > > > >> > > > > > > the request queue in such a way that it never > blocks. > > > You > > > > > can > > > > > >> > tell > > > > > >> > > > > people > > > > > >> > > > > > > to just set the request queue to be unbounded, which > > may > > > > > scare > > > > > >> > > them. > > > > > >> > > > If > > > > > >> > > > > > we > > > > > >> > > > > > > do want to put a bound, it seems it's easier to > > > configure > > > > > the > > > > > >> > queue > > > > > >> > > > > size > > > > > >> > > > > > > based on # requests. Basically, we can tell people > to > > > set > > > > > the > > > > > >> > queue > > > > > >> > > > > size > > > > > >> > > > > > > based on number of connections. If the queue is > based > > on > > > > > >> bytes, > > > > > >> > > it's > > > > > >> > > > > not > > > > > >> > > > > > > clear how people should set it w/o causing the > > processor > > > > to > > > > > >> > block. > > > > > >> > > > > > > > > > > > >> > > > > > > Finally, Rajini has a good point. The ByteBuffer in > > the > > > > > >> request > > > > > >> > > > object > > > > > >> > > > > is > > > > > >> > > > > > > allocated as soon as we see the first 4 bytes from > the > > > > > socket. > > > > > >> > So, > > > > > >> > > I > > > > > >> > > > am > > > > > >> > > > > > not > > > > > >> > > > > > > sure if just bounding the request queue itself is > > enough > > > > to > > > > > >> bound > > > > > >> > > the > > > > > >> > > > > > > memory related to requests. > > > > > >> > > > > > > > > > > > >> > > > > > > Thanks, > > > > > >> > > > > > > > > > > > >> > > > > > > Jun > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai < > > > > > >> > radai.rosenbl...@gmail.com> > > > > > >> > > > > > wrote: > > > > > >> > > > > > > > > > > > >> > > > > > > > I agree that filling up the request queue can > cause > > > > > clients > > > > > >> to > > > > > >> > > time > > > > > >> > > > > out > > > > > >> > > > > > > > (and presumably retry?). However, for the > workloads > > > > where > > > > > we > > > > > >> > > expect > > > > > >> > > > > > this > > > > > >> > > > > > > > configuration to be useful the alternative is > > > currently > > > > an > > > > > >> OOM > > > > > >> > > > crash. > > > > > >> > > > > > > > In my opinion an initial implementation of this > > > feature > > > > > >> could > > > > > >> > be > > > > > >> > > > > > > > constrained to a simple drop-in replacement of > > > > > >> > ArrayBlockingQueue > > > > > >> > > > > > > > (conditional, opt-in) and further study of > behavior > > > > > patterns > > > > > >> > > under > > > > > >> > > > > load > > > > > >> > > > > > > can > > > > > >> > > > > > > > drive future changes to the API later when those > > > > behaviors > > > > > >> are > > > > > >> > > > better > > > > > >> > > > > > > > understood (like back-pressure, nop filler > responses > > > to > > > > > >> avoid > > > > > >> > > > client > > > > > >> > > > > > > > timeouts or whatever). > > > > > >> > > > > > > > > > > > > >> > > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < > > > > > >> > > > > > > > gharatmayures...@gmail.com> > > > > > >> > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > >> > > > > > > > > Nice write up Radai. > > > > > >> > > > > > > > > I think what Jun said is a valid concern. > > > > > >> > > > > > > > > If I am not wrong as per the proposal, we are > > > > depending > > > > > on > > > > > >> > the > > > > > >> > > > > entire > > > > > >> > > > > > > > > pipeline to flow smoothly from accepting > requests > > to > > > > > >> handling > > > > > >> > > it, > > > > > >> > > > > > > calling > > > > > >> > > > > > > > > KafkaApis and handing back the responses. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Mayuresh > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy < > > > > > >> > > jjkosh...@gmail.com > > > > > >> > > > > > > > > > >> > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > . > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > > Hi Becket, > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > I don't think progress can be made in the > > > > > processor's > > > > > >> run > > > > > >> > > > loop > > > > > >> > > > > if > > > > > >> > > > > > > the > > > > > >> > > > > > > > > > > queue fills up. i.e., I think Jun's point is > > > that > > > > if > > > > > >> the > > > > > >> > > > queue > > > > > >> > > > > is > > > > > >> > > > > > > > full > > > > > >> > > > > > > > > > > (either due to the proposed max.bytes or > today > > > due > > > > > to > > > > > >> > > > > > max.requests > > > > > >> > > > > > > > > > hitting > > > > > >> > > > > > > > > > > the limit) then processCompletedReceives > will > > > > block > > > > > >> and > > > > > >> > no > > > > > >> > > > > > further > > > > > >> > > > > > > > > > progress > > > > > >> > > > > > > > > > > can be made. > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > I'm sorry - this isn't right. There will be > > > progress > > > > > as > > > > > >> > long > > > > > >> > > as > > > > > >> > > > > the > > > > > >> > > > > > > API > > > > > >> > > > > > > > > > handlers are able to pick requests off the > > request > > > > > queue > > > > > >> > and > > > > > >> > > > add > > > > > >> > > > > > the > > > > > >> > > > > > > > > > responses to the response queues (which are > > > > > effectively > > > > > >> > > > > unbounded). > > > > > >> > > > > > > > > > However, the point is valid that blocking in > the > > > > > request > > > > > >> > > > > channel's > > > > > >> > > > > > > put > > > > > >> > > > > > > > > has > > > > > >> > > > > > > > > > the effect of exacerbating the pressure on the > > > > socket > > > > > >> > server. > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao < > > > > > >> > > j...@confluent.io> > > > > > >> > > > > > > wrote: > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > Radai, > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > Thanks for the proposal. A couple of > > comments > > > > on > > > > > >> this. > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > 1. Since we store request objects in the > > > > request > > > > > >> > queue, > > > > > >> > > > how > > > > > >> > > > > do > > > > > >> > > > > > > we > > > > > >> > > > > > > > > get > > > > > >> > > > > > > > > > an > > > > > >> > > > > > > > > > >> > accurate size estimate for those > requests? > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > 2. Currently, it's bad if the processor > > > blocks > > > > on > > > > > >> > > adding a > > > > > >> > > > > > > request > > > > > >> > > > > > > > > to > > > > > >> > > > > > > > > > >> the > > > > > >> > > > > > > > > > >> > request queue. Once blocked, the > processor > > > > can't > > > > > >> > process > > > > > >> > > > the > > > > > >> > > > > > > > sending > > > > > >> > > > > > > > > > of > > > > > >> > > > > > > > > > >> > responses of other socket keys either. > This > > > > will > > > > > >> cause > > > > > >> > > all > > > > > >> > > > > > > clients > > > > > >> > > > > > > > > in > > > > > >> > > > > > > > > > >> this > > > > > >> > > > > > > > > > >> > processor with an outstanding request to > > > > > eventually > > > > > >> > > > timeout. > > > > > >> > > > > > > > > > Typically, > > > > > >> > > > > > > > > > >> > this will trigger client-side retries, > > which > > > > will > > > > > >> add > > > > > >> > > more > > > > > >> > > > > > load > > > > > >> > > > > > > on > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > >> > broker and cause potentially more > > congestion > > > in > > > > > the > > > > > >> > > > request > > > > > >> > > > > > > queue. > > > > > >> > > > > > > > > > With > > > > > >> > > > > > > > > > >> > queued.max.requests, to prevent blocking > on > > > the > > > > > >> > request > > > > > >> > > > > queue, > > > > > >> > > > > > > our > > > > > >> > > > > > > > > > >> > recommendation is to configure > > > > > queued.max.requests > > > > > >> to > > > > > >> > be > > > > > >> > > > the > > > > > >> > > > > > > same > > > > > >> > > > > > > > as > > > > > >> > > > > > > > > > the > > > > > >> > > > > > > > > > >> > number of socket connections on the > broker. > > > > Since > > > > > >> the > > > > > >> > > > broker > > > > > >> > > > > > > never > > > > > >> > > > > > > > > > >> > processes more than 1 request per > > connection > > > > at a > > > > > >> > time, > > > > > >> > > > the > > > > > >> > > > > > > > request > > > > > >> > > > > > > > > > >> queue > > > > > >> > > > > > > > > > >> > will never be blocked. With > > queued.max.bytes, > > > > > it's > > > > > >> > going > > > > > >> > > > to > > > > > >> > > > > be > > > > > >> > > > > > > > > harder > > > > > >> > > > > > > > > > to > > > > > >> > > > > > > > > > >> > configure the value properly to prevent > > > > blocking. > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > So, while adding queued.max.bytes is > > > > potentially > > > > > >> > useful > > > > > >> > > > for > > > > > >> > > > > > > memory > > > > > >> > > > > > > > > > >> > management, for it to be truly useful, we > > > > > probably > > > > > >> > need > > > > > >> > > to > > > > > >> > > > > > > address > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > >> > processor blocking issue for it to be > > really > > > > > >> useful in > > > > > >> > > > > > practice. > > > > > >> > > > > > > > One > > > > > >> > > > > > > > > > >> > possibility is to put back-pressure to > the > > > > client > > > > > >> when > > > > > >> > > the > > > > > >> > > > > > > request > > > > > >> > > > > > > > > > >> queue is > > > > > >> > > > > > > > > > >> > blocked. For example, if the processor > > > notices > > > > > that > > > > > >> > the > > > > > >> > > > > > request > > > > > >> > > > > > > > > queue > > > > > >> > > > > > > > > > is > > > > > >> > > > > > > > > > >> > full, it can turn off the interest bit > for > > > read > > > > > for > > > > > >> > all > > > > > >> > > > > socket > > > > > >> > > > > > > > keys. > > > > > >> > > > > > > > > > >> This > > > > > >> > > > > > > > > > >> > will allow the processor to continue > > handling > > > > > >> > responses. > > > > > >> > > > > When > > > > > >> > > > > > > the > > > > > >> > > > > > > > > > >> request > > > > > >> > > > > > > > > > >> > queue has space again, it can indicate > the > > > new > > > > > >> state > > > > > >> > to > > > > > >> > > > the > > > > > >> > > > > > > > process > > > > > >> > > > > > > > > > and > > > > > >> > > > > > > > > > >> > wake up the selector. Not sure how this > > will > > > > work > > > > > >> with > > > > > >> > > > > > multiple > > > > > >> > > > > > > > > > >> processors > > > > > >> > > > > > > > > > >> > though since the request queue is shared > > > across > > > > > all > > > > > >> > > > > > processors. > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > Jun > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai < > > > > > >> > > > > > > > radai.rosenbl...@gmail.com> > > > > > >> > > > > > > > > > >> wrote: > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > Hello, > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > > > > >> > > I'd like to initiate a discussion about > > > > > >> > > > > > > > > > >> > > https://cwiki.apache.org/ > > > > > >> > > confluence/display/KAFKA/KIP- > > > > > >> > > > > > > > > > >> > > 72%3A+Allow+Sizing+Incoming+Re > > > > > >> quest+Queue+in+Bytes > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > > > > >> > > The goal of the KIP is to allow > > > configuring a > > > > > >> bound > > > > > >> > on > > > > > >> > > > the > > > > > >> > > > > > > > > capacity > > > > > >> > > > > > > > > > >> (as > > > > > >> > > > > > > > > > >> > in > > > > > >> > > > > > > > > > >> > > bytes of memory used) of the incoming > > > request > > > > > >> queue, > > > > > >> > > in > > > > > >> > > > > > > addition > > > > > >> > > > > > > > > to > > > > > >> > > > > > > > > > >> the > > > > > >> > > > > > > > > > >> > > current bound on the number of > messages. > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > > > > >> > > This comes after several incidents at > > > > Linkedin > > > > > >> > where a > > > > > >> > > > > > sudden > > > > > >> > > > > > > > > > "spike" > > > > > >> > > > > > > > > > >> of > > > > > >> > > > > > > > > > >> > > large message batches caused an out of > > > memory > > > > > >> > > exception. > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > > > > >> > > Thank you, > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > > > > >> > > Radai > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > -- > > > > > >> > > > > > > > > -Regards, > > > > > >> > > > > > > > > Mayuresh R. Gharat > > > > > >> > > > > > > > > (862) 250-7125 > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Regards, > > > > > >> > > > > > >> Rajini > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >