Hi Jun,

10 - mute/unmute functionality has been added in
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting.
I have yet to run stress tests to see how it behaves versus without muting

11 - I've added a SimplePool implementation (nothing more than an
AtomicLong really) and compared it with my GC pool (that uses weak refs) -
https://github.com/radai-rosenblatt/kafka-benchmarks/tree/master/memorypool-benchmarks.
the results show no noticeable difference. what the results _do_ show
though is that for large requests (1M) performance drops very sharply.
since the SimplePool is essentially identical to current kafka code
behaviour (the nechmark never reaches out of memory conditions) it would
suggest to me that kafka performance for large request suffers greatly from
the cost of allocating (and releasing) large buffers (instead of actually
pooling them for later re-use). this means that an implementation of memory
pool that actually pools ( :-) ) is very likely to improve broker
performance for large requests.

12 - if there was a single thread iterating over selection keys then
stopping at 1st unsatisfiable request might work (if iteration order over
selection keys is deterministic, which is OS-dependent). however, kafka
spawns multiple selectors sharing the same pool so i doubt the approach
would gain anything. also notice that the current code already shuffles the
selection keys if memory is low (<10%) to try and guarantee fairness.

attached the benchmark results for the pool implementations:

Benchmark                                        Mode  Cnt
Score        Error  Units
GarbageCollectedMemoryPoolBenchmark.alloc_100k  thrpt    5
198272.519 ±  16045.965  ops/s
GarbageCollectedMemoryPoolBenchmark.alloc_10k   thrpt    5
2781439.307 ± 185287.072  ops/s
GarbageCollectedMemoryPoolBenchmark.alloc_1k    thrpt    5
6029199.952 ± 465936.118  ops/s
GarbageCollectedMemoryPoolBenchmark.alloc_1m    thrpt    5
18464.272 ±    332.861  ops/s
SimpleMemoryPoolBenchmark.alloc_100k            thrpt    5
204240.066 ±   2207.619  ops/s
SimpleMemoryPoolBenchmark.alloc_10k             thrpt    5
3000794.525 ±  83510.836  ops/s
SimpleMemoryPoolBenchmark.alloc_1k              thrpt    5
5893671.778 ± 274239.541  ops/s
SimpleMemoryPoolBenchmark.alloc_1m              thrpt    5
18728.085 ±    792.563  ops/s



On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao <j...@confluent.io> wrote:

> Hi, Radi,
>
> For 10, yes, we don't want the buffer pool to wake up the selector every
> time some memory is freed up. We only want to do that when there is pending
> requests to the buffer pool not honored due to not enough memory.
>
> For 11, we probably want to be a bit careful with Weak References. In
> https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an
> implementation based on Weak Reference, but abandoned it due to too much GC
> overhead. It probably also makes the code a bit harder to understand. So,
> perhaps it would be better if we can avoid it.
>
> For 12, that's a good point. I thought the selector will do some shuffling
> for fairness. Perhaps we should stop allocating from the buffer pool when
> we see the first key whose memory can't be honored?
>
> Thanks,
>
> Jun
>
>
> On Sat, Sep 24, 2016 at 8:44 AM, radai <radai.rosenbl...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > 10 - I'll add this functionality to the mute/unmute branch. as every
> > mute/unmute operation is O(#connections / #selectorThreads) maybe a
> > watermark approach is better than waking when _any_ mem is available?
> >
> > 11 - "gc notifications" are done by using a ReferenceQueue (
> > https://docs.oracle.com/javase/8/docs/api/java/lang/
> > ref/ReferenceQueue.html)
> > in combination with weak references to allocated buffers. when a buffer
> is
> > reclaimed by the GC the corresponding weak ref to it is enqueued. the
> pool
> > maintains a set of outstanding buffer IDs (every allocated buffer gets a
> > unique id - basically a sequence). a buffer explicitly returned has its
> id
> > removed from the tracking set and the weak reference to it destroyed, so
> > its reference will never be enqueued by the GC even if it is GC'ed later.
> > an enqueued reference (which indicates a buffer not returned to pool)
> also
> > carries the buffer id, which is then removed from the outstanding buffers
> > set and the memory marked as available (and a warning printed). the pool
> > has a background thread dedicated to reading references out of the queue
> > (which under normal conditions remains blocked forever).
> >
> > 12 - the issue here is that a single "large" request (say 1MB) can get
> > blocked indefinitely under a high pressure of much smaller requests (say
> > 1KB) keeping memory utilization close to 100%. if we dont care about
> > potential starvation the change is in a single condition. i'll make this
> > configurable.
> >
> > 13 - I'll change the parameter name.
> >
> > On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Radai,
> > >
> > > Thanks for the updated KIP. A few more questions/comments below.
> > >
> > > 10. For "the mute/unmute happens just before poll(), which means as a
> > worst
> > > case there will be no reads for 300ms if memory was unavailable", I am
> > > thinking that memory-pool could track if there is any pending request
> and
> > > wake up the selector when memory is released and there is a pending
> > > request. This way, poll() doesn't have to wait for the timeout if
> memory
> > > frees up early.
> > >
> > > 11. For "to facilitate faster implementation (as a safety net) the pool
> > > will be implemented in such a way that memory that was not release()ed
> > (but
> > > still garbage collected) would be detected and "reclaimed". this is to
> > > prevent "leaks" in case of code paths that fail to release()
> properly.",
> > > could you explain a bit at the high level how this is done?
> > >
> > > 12. For "As the pool would allow any size request if it has any
> capacity
> > > available, the actual memory bound is queued.max.bytes +
> > > socket.request.max.bytes.", it seems intuitively, the pool should only
> > give
> > > the Buffer back if it has enough available bytes. Then the request
> memory
> > > can be bounded by queued.max.bytes. We can validate that
> queued.max.bytes
> > > is at least socket.request.max.bytes.
> > >
> > > 13. For the naming, it seems request.queue.max.bytes is clearer than
> > > queue.max.bytes.
> > >
> > > Jun
> > >
> > >
> > >
> > > On Thu, Sep 22, 2016 at 10:53 AM, radai <radai.rosenbl...@gmail.com>
> > > wrote:
> > >
> > > > As discussed in the KIP call, I have updated the kip-72 page (
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> > > > to record both configuration validations and implementation concerns.
> > > > I've also implemented channel muting/unmuting in response to memory
> > > > pressure. its available as a separate branch here -
> > > > https://github.com/radai-rosenblatt/kafka/tree/broker-
> > > > memory-pool-with-muting
> > > > . the implementation without muting is here -
> > > > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.
> > > >
> > > > the mute/unmute happens just before poll(), which means as a worst
> case
> > > > there will be no reads for 300ms if memory was unavailable (thats the
> > > > timeout untill the next poll). perhaps a design with dedicated read
> > > threads
> > > > could do better (such a thread could actually block waiting for
> > memory),
> > > > but that would be a giant change.
> > > >
> > > > On Tue, Sep 13, 2016 at 9:20 AM, radai <radai.rosenbl...@gmail.com>
> > > wrote:
> > > >
> > > > > the specific memory pool implementation i wrote will allocate _any_
> > > > amount
> > > > > you request if it has _any_ memory available (so if it has 1 byte
> > > > available
> > > > > and you ask for 1MB you will get 1MB and the counter will go
> > negative).
> > > > > this was done to avoid issues with starvation of large requests.
> > other
> > > > > implementations may be more strict. to me this means that generally
> > its
> > > > not
> > > > > a simple "have memory" vs "no memory" split (which gets worse
> under a
> > > > > hypothetical tiered pool scheme for QoS).
> > > > >
> > > > > to allow this flexibility in pool implementation i must preserve
> the
> > > > > amount of memory required. once read from the channel i cant put it
> > > back,
> > > > > so i store it?
> > > > >
> > > > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
> > > > > rajinisiva...@googlemail.com> wrote:
> > > > >
> > > > >> Is there any value in allowing the 4-byte size to be read even
> when
> > > the
> > > > >> request memory limit has been reached? If not, you can disable
> > OP_READ
> > > > >> interest for all channels that are ready inside Selector.poll()
> when
> > > > >> memory
> > > > >> limit has been reached and re-enable before returning from poll().
> > > > Perhaps
> > > > >> a listener that is invoked when MemoryPool moves from unavailable
> to
> > > > >> available state can wakeup the selector. The changes for this
> should
> > > be
> > > > >> fairly contained without any additional channel state. And it
> would
> > > > avoid
> > > > >> the overhead of polls that return immediately even when progress
> > > cannot
> > > > be
> > > > >> made because memory limit has been reached.
> > > > >>
> > > > >> On Tue, Sep 13, 2016 at 12:31 AM, radai <
> radai.rosenbl...@gmail.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi Jun,
> > > > >> >
> > > > >> > Yes, youre right - right now the next select() call will return
> > > > >> immediately
> > > > >> > with the same set of keys as earlier (at least) as they were not
> > > > >> previously
> > > > >> > handled (no memory).
> > > > >> > My assumption is that this happens under considerable load -
> > > something
> > > > >> has
> > > > >> > to be occupying all this memory. also, this happens in the
> context
> > > of
> > > > >> > SocketServer.Processor.run():
> > > > >> >
> > > > >> > while (isRunning) {
> > > > >> >    configureNewConnections()
> > > > >> >    processNewResponses()
> > > > >> >    poll()   <------ HERE
> > > > >> >    processCompletedReceives()
> > > > >> >    processCompletedSends()
> > > > >> >    processDisconnected()
> > > > >> > }
> > > > >> >
> > > > >> > even within poll(), things like finishConnection(), prepare(),
> and
> > > > >> write()s
> > > > >> > can still make progress under low memory conditions. and given
> the
> > > > load,
> > > > >> > there's probably progress to be made in
> > processCompletedReceives(),
> > > > >> > processCompletedSends() and processDisconnected().
> > > > >> >
> > > > >> > if there's progress to be made in other things its likely that
> the
> > > > next
> > > > >> > call to poll() will not happen immediately and so the loop wont
> be
> > > > that
> > > > >> > tight. in order for this to devolve into true busy waiting you
> > would
> > > > >> need a
> > > > >> > situation where no progress can be made on any in-progress
> > requests
> > > > and
> > > > >> no
> > > > >> > responses to send out ?
> > > > >> >
> > > > >> > if my assumption does not hold then you are correct, and
> > > > >> selector.poll(300)
> > > > >> > currently hardcoded in SocketServer.Processor.poll() would need
> to
> > > be
> > > > >> > replaced with something more complicated. my biggest point of
> > > concern
> > > > >> > though is that the resulting code would be complicated and would
> > > > couple
> > > > >> > Selector to the memory pool very tightly. undey my current patch
> > > > >> Selector
> > > > >> > needs the memory pool only to pass to channels when they are
> > built.
> > > > this
> > > > >> > would allow different memory pools relatively easily for things
> > like
> > > > >> > reserving memory for cross-broker replication and high-SLA
> > > > connections.
> > > > >> a
> > > > >> > tighter coupling would make any such future modification hard.
> > > > >> >
> > > > >> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > >> >
> > > > >> > > Hi, Radai,
> > > > >> > >
> > > > >> > > Thanks for the reply. I still have a followup question on #2.
> > > > >> > >
> > > > >> > > My understanding is that in your proposal, selector will now
> > first
> > > > >> read
> > > > >> > the
> > > > >> > > size of the Receive. If there is not enough memory, it has to
> > turn
> > > > off
> > > > >> > the
> > > > >> > > READ interest bit for the corresponding KafkaChannel.
> Otherwise,
> > > > >> > subsequent
> > > > >> > > selector.poll() call will always return immediately, adding
> > > > >> unnecessary
> > > > >> > > overhead. If you do that, the  Selector will need to know when
> > to
> > > > >> turn on
> > > > >> > > the READ interest bit again. It may not be enough to do this
> > check
> > > > >> until
> > > > >> > > the next poll call since the timeout used by poll() could be
> > > > >> arbitrarily
> > > > >> > > large. So, it seems that some kind of coordination between the
> > > > >> Selector
> > > > >> > and
> > > > >> > > the bufferpool is needed?
> > > > >> > >
> > > > >> > > Jun
> > > > >> > >
> > > > >> > > On Thu, Sep 8, 2016 at 7:02 PM, radai <
> > radai.rosenbl...@gmail.com
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > Hi Jun,
> > > > >> > > >
> > > > >> > > > 1. yes, it is my own personal opinion that people use
> > > > >> > queued.max.requests
> > > > >> > > > as an indirect way to bound memory consumption. once a more
> > > direct
> > > > >> > memory
> > > > >> > > > bound mechanism exists (and works) i dont think
> > > > queued.max.requests
> > > > >> > woul
> > > > >> > > > dbe required. having said that I was not planning on making
> > any
> > > > >> changes
> > > > >> > > > w.r.t queued.max.requests support (so I was aiming to get
> to a
> > > > >> > situation
> > > > >> > > > where both configs are supported) to allow gathering enough
> > > > >> > > data/feedback.
> > > > >> > > >
> > > > >> > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe
> > get a
> > > > >> > > > NetworkReceive. multiple such read() calls may be required
> > > until a
> > > > >> > > Receive
> > > > >> > > > is produced already in the current code base. my pool
> > > > >> implementation is
> > > > >> > > > non-blocking so if there's no memory available the read()
> call
> > > > will
> > > > >> > > return
> > > > >> > > > null. poll() would then move on to try and service other
> > > selection
> > > > >> > keys.
> > > > >> > > > the pool will be checked for available memory again the next
> > > time
> > > > >> the
> > > > >> > > > SocketServer.run() loop gets to poll(). and so right now I
> > dont
> > > > >> > > communicate
> > > > >> > > > memory becoming available to the selector - it will just go
> on
> > > to
> > > > >> try
> > > > >> > and
> > > > >> > > > make progress elsewhere and come back again. i never block
> it
> > or
> > > > >> send
> > > > >> > it
> > > > >> > > to
> > > > >> > > > sleep. I think for efficiency what could maybe be done is if
> > > > there's
> > > > >> > not
> > > > >> > > > enough memory to service a readable selection key we may
> want
> > to
> > > > >> skip
> > > > >> > all
> > > > >> > > > other read-ready selection keys for that iteration of
> > > > >> > > pollSelectionKeys().
> > > > >> > > > that would require rather invasive changes around
> > > > >> > > > Selector.pollSelectionKeys() that I'd rather avoid. also
> > > different
> > > > >> > > > KafkaChannels may be backed by different memory pool (under
> > some
> > > > >> sort
> > > > >> > of
> > > > >> > > > future QoS scheme?), which would complicate such an
> > optimization
> > > > >> > further.
> > > > >> > > >
> > > > >> > > > 3. i added the pool interface and implementation under
> > > > >> > > kafka.common.memory,
> > > > >> > > > and the API is "thin" enough to be generally useful
> (currently
> > > its
> > > > >> > > > non-blocking only, but a get(long maxWait) is definitely
> > > doable).
> > > > >> > having
> > > > >> > > > said that, I'm not really familiar enough with the code to
> > > say....
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > >> > > >
> > > > >> > > > > Hi, Radi,
> > > > >> > > > >
> > > > >> > > > > Thanks for the update. At the high level, this looks
> > > promising.
> > > > A
> > > > >> few
> > > > >> > > > > comments below.
> > > > >> > > > >
> > > > >> > > > > 1. If we can bound the requests by bytes, it seems that we
> > > don't
> > > > >> need
> > > > >> > > > > queued.max.requests
> > > > >> > > > > any more? Could we just deprecate the config and make the
> > > queue
> > > > >> size
> > > > >> > > > > unbounded?
> > > > >> > > > > 2. How do we communicate back to the selector when some
> > memory
> > > > is
> > > > >> > freed
> > > > >> > > > up?
> > > > >> > > > > We probably need to wake up the selector. For efficiency,
> > > > perhaps
> > > > >> we
> > > > >> > > only
> > > > >> > > > > need to wake up the selector if the bufferpool is full?
> > > > >> > > > > 3. We talked about bounding the consumer's memory before.
> To
> > > > fully
> > > > >> > > > support
> > > > >> > > > > that, we will need to bound the memory used by different
> > fetch
> > > > >> > > responses
> > > > >> > > > in
> > > > >> > > > > the consumer. Do you think the changes that you propose
> here
> > > can
> > > > >> be
> > > > >> > > > > leveraged to bound the memory in the consumer as well?
> > > > >> > > > >
> > > > >> > > > > Jun
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai <
> > > > >> radai.rosenbl...@gmail.com>
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > My apologies for the delay in response.
> > > > >> > > > > >
> > > > >> > > > > > I agree with the concerns about OOM reading from the
> > actual
> > > > >> sockets
> > > > >> > > and
> > > > >> > > > > > blocking the network threads - messing with the request
> > > queue
> > > > >> > itself
> > > > >> > > > > would
> > > > >> > > > > > not do.
> > > > >> > > > > >
> > > > >> > > > > > I propose instead a memory pool approach - the broker
> > would
> > > > >> have a
> > > > >> > > non
> > > > >> > > > > > blocking memory pool. upon reading the first 4 bytes out
> > of
> > > a
> > > > >> > socket
> > > > >> > > an
> > > > >> > > > > > attempt would be made to acquire enough memory and if
> that
> > > > >> attempt
> > > > >> > > > fails
> > > > >> > > > > > the processing thread will move on to try and make
> > progress
> > > > with
> > > > >> > > other
> > > > >> > > > > > tasks.
> > > > >> > > > > >
> > > > >> > > > > > I think Its simpler than mute/unmute because using
> > > mute/unmute
> > > > >> > would
> > > > >> > > > > > require differentiating between sockets muted due to a
> > > request
> > > > >> in
> > > > >> > > > > progress
> > > > >> > > > > > (normal current operation) and sockets muted due to lack
> > of
> > > > >> memory.
> > > > >> > > > > sockets
> > > > >> > > > > > of the 1st kind would be unmuted at the end of request
> > > > >> processing
> > > > >> > (as
> > > > >> > > > it
> > > > >> > > > > > happens right now) but the 2nd kind would require some
> > sort
> > > of
> > > > >> > > "unmute
> > > > >> > > > > > watchdog" which is (i claim) more complicated than a
> > memory
> > > > >> pool.
> > > > >> > > also
> > > > >> > > > a
> > > > >> > > > > > memory pool is a more generic solution.
> > > > >> > > > > >
> > > > >> > > > > > I've updated the KIP page (
> > > > >> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >> > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+
> > > > >> > Incoming+requests)
> > > > >> > > > > > to reflect the new proposed implementation, and i've
> also
> > > put
> > > > >> up an
> > > > >> > > > > inital
> > > > >> > > > > > implementation proposal on github -
> > > > >> > > > > > https://github.com/radai-rosenblatt/kafka/commits/
> > > > >> > broker-memory-pool
> > > > >> > > .
> > > > >> > > > > the
> > > > >> > > > > > proposed code is not complete and tested yet (so
> probably
> > > > buggy)
> > > > >> > but
> > > > >> > > > does
> > > > >> > > > > > include the main points of modification.
> > > > >> > > > > >
> > > > >> > > > > > the specific implementation of the pool on that branch
> > also
> > > > has
> > > > >> a
> > > > >> > > built
> > > > >> > > > > in
> > > > >> > > > > > safety net where memory that is acquired but not
> released
> > > > (which
> > > > >> > is a
> > > > >> > > > > bug)
> > > > >> > > > > > is discovered when the garbage collector frees it and
> the
> > > > >> capacity
> > > > >> > is
> > > > >> > > > > > reclaimed.
> > > > >> > > > > >
> > > > >> > > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao <
> j...@confluent.io
> > >
> > > > >> wrote:
> > > > >> > > > > >
> > > > >> > > > > > > Radi,
> > > > >> > > > > > >
> > > > >> > > > > > > Yes, I got the benefit of bounding the request queue
> by
> > > > >> bytes. My
> > > > >> > > > > concern
> > > > >> > > > > > > is the following if we don't change the behavior of
> > > > processor
> > > > >> > > > blocking
> > > > >> > > > > on
> > > > >> > > > > > > queue full.
> > > > >> > > > > > >
> > > > >> > > > > > > If the broker truly doesn't have enough memory for
> > > buffering
> > > > >> > > > > outstanding
> > > > >> > > > > > > requests from all connections, we have to either hit
> OOM
> > > or
> > > > >> block
> > > > >> > > the
> > > > >> > > > > > > processor. Both will be bad. I am not sure if one is
> > > clearly
> > > > >> > better
> > > > >> > > > > than
> > > > >> > > > > > > the other. In this case, the solution is probably to
> > > expand
> > > > >> the
> > > > >> > > > cluster
> > > > >> > > > > > to
> > > > >> > > > > > > reduce the per broker request load.
> > > > >> > > > > > >
> > > > >> > > > > > > If the broker actually has enough memory, we want to
> be
> > > able
> > > > >> to
> > > > >> > > > > configure
> > > > >> > > > > > > the request queue in such a way that it never blocks.
> > You
> > > > can
> > > > >> > tell
> > > > >> > > > > people
> > > > >> > > > > > > to just set the request queue to be unbounded, which
> may
> > > > scare
> > > > >> > > them.
> > > > >> > > > If
> > > > >> > > > > > we
> > > > >> > > > > > > do want to put a bound, it seems it's easier to
> > configure
> > > > the
> > > > >> > queue
> > > > >> > > > > size
> > > > >> > > > > > > based on # requests. Basically, we can tell people to
> > set
> > > > the
> > > > >> > queue
> > > > >> > > > > size
> > > > >> > > > > > > based on number of connections. If the queue is based
> on
> > > > >> bytes,
> > > > >> > > it's
> > > > >> > > > > not
> > > > >> > > > > > > clear how people should set it w/o causing the
> processor
> > > to
> > > > >> > block.
> > > > >> > > > > > >
> > > > >> > > > > > > Finally, Rajini has a good point. The ByteBuffer in
> the
> > > > >> request
> > > > >> > > > object
> > > > >> > > > > is
> > > > >> > > > > > > allocated as soon as we see the first 4 bytes from the
> > > > socket.
> > > > >> > So,
> > > > >> > > I
> > > > >> > > > am
> > > > >> > > > > > not
> > > > >> > > > > > > sure if just bounding the request queue itself is
> enough
> > > to
> > > > >> bound
> > > > >> > > the
> > > > >> > > > > > > memory related to requests.
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks,
> > > > >> > > > > > >
> > > > >> > > > > > > Jun
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai <
> > > > >> > radai.rosenbl...@gmail.com>
> > > > >> > > > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > I agree that filling up the request queue can cause
> > > > clients
> > > > >> to
> > > > >> > > time
> > > > >> > > > > out
> > > > >> > > > > > > > (and presumably retry?). However, for the workloads
> > > where
> > > > we
> > > > >> > > expect
> > > > >> > > > > > this
> > > > >> > > > > > > > configuration to be useful the alternative is
> > currently
> > > an
> > > > >> OOM
> > > > >> > > > crash.
> > > > >> > > > > > > > In my opinion an initial implementation of this
> > feature
> > > > >> could
> > > > >> > be
> > > > >> > > > > > > > constrained to a simple drop-in replacement of
> > > > >> > ArrayBlockingQueue
> > > > >> > > > > > > > (conditional, opt-in) and further study of behavior
> > > > patterns
> > > > >> > > under
> > > > >> > > > > load
> > > > >> > > > > > > can
> > > > >> > > > > > > > drive future changes to the API later when those
> > > behaviors
> > > > >> are
> > > > >> > > > better
> > > > >> > > > > > > > understood (like back-pressure, nop filler responses
> > to
> > > > >> avoid
> > > > >> > > > client
> > > > >> > > > > > > > timeouts or whatever).
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> > > > >> > > > > > > > gharatmayures...@gmail.com>
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Nice write up Radai.
> > > > >> > > > > > > > > I think what Jun said is a valid concern.
> > > > >> > > > > > > > > If I am not wrong as per the proposal, we are
> > > depending
> > > > on
> > > > >> > the
> > > > >> > > > > entire
> > > > >> > > > > > > > > pipeline to flow smoothly from accepting requests
> to
> > > > >> handling
> > > > >> > > it,
> > > > >> > > > > > > calling
> > > > >> > > > > > > > > KafkaApis and handing back the responses.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Thanks,
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Mayuresh
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy <
> > > > >> > > jjkosh...@gmail.com
> > > > >> > > > >
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > .
> > > > >> > > > > > > > > > >>
> > > > >> > > > > > > > > > >>
> > > > >> > > > > > > > > > > Hi Becket,
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > I don't think progress can be made in the
> > > > processor's
> > > > >> run
> > > > >> > > > loop
> > > > >> > > > > if
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > > queue fills up. i.e., I think Jun's point is
> > that
> > > if
> > > > >> the
> > > > >> > > > queue
> > > > >> > > > > is
> > > > >> > > > > > > > full
> > > > >> > > > > > > > > > > (either due to the proposed max.bytes or today
> > due
> > > > to
> > > > >> > > > > > max.requests
> > > > >> > > > > > > > > > hitting
> > > > >> > > > > > > > > > > the limit) then processCompletedReceives will
> > > block
> > > > >> and
> > > > >> > no
> > > > >> > > > > > further
> > > > >> > > > > > > > > > progress
> > > > >> > > > > > > > > > > can be made.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > I'm sorry - this isn't right. There will be
> > progress
> > > > as
> > > > >> > long
> > > > >> > > as
> > > > >> > > > > the
> > > > >> > > > > > > API
> > > > >> > > > > > > > > > handlers are able to pick requests off the
> request
> > > > queue
> > > > >> > and
> > > > >> > > > add
> > > > >> > > > > > the
> > > > >> > > > > > > > > > responses to the response queues (which are
> > > > effectively
> > > > >> > > > > unbounded).
> > > > >> > > > > > > > > > However, the point is valid that blocking in the
> > > > request
> > > > >> > > > > channel's
> > > > >> > > > > > > put
> > > > >> > > > > > > > > has
> > > > >> > > > > > > > > > the effect of exacerbating the pressure on the
> > > socket
> > > > >> > server.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >>
> > > > >> > > > > > > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao <
> > > > >> > > j...@confluent.io>
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > > > > >>
> > > > >> > > > > > > > > > >> > Radai,
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > Thanks for the proposal. A couple of
> comments
> > > on
> > > > >> this.
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > 1. Since we store request objects in the
> > > request
> > > > >> > queue,
> > > > >> > > > how
> > > > >> > > > > do
> > > > >> > > > > > > we
> > > > >> > > > > > > > > get
> > > > >> > > > > > > > > > an
> > > > >> > > > > > > > > > >> > accurate size estimate for those requests?
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > 2. Currently, it's bad if the processor
> > blocks
> > > on
> > > > >> > > adding a
> > > > >> > > > > > > request
> > > > >> > > > > > > > > to
> > > > >> > > > > > > > > > >> the
> > > > >> > > > > > > > > > >> > request queue. Once blocked, the processor
> > > can't
> > > > >> > process
> > > > >> > > > the
> > > > >> > > > > > > > sending
> > > > >> > > > > > > > > > of
> > > > >> > > > > > > > > > >> > responses of other socket keys either. This
> > > will
> > > > >> cause
> > > > >> > > all
> > > > >> > > > > > > clients
> > > > >> > > > > > > > > in
> > > > >> > > > > > > > > > >> this
> > > > >> > > > > > > > > > >> > processor with an outstanding request to
> > > > eventually
> > > > >> > > > timeout.
> > > > >> > > > > > > > > > Typically,
> > > > >> > > > > > > > > > >> > this will trigger client-side retries,
> which
> > > will
> > > > >> add
> > > > >> > > more
> > > > >> > > > > > load
> > > > >> > > > > > > on
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > >> > broker and cause potentially more
> congestion
> > in
> > > > the
> > > > >> > > > request
> > > > >> > > > > > > queue.
> > > > >> > > > > > > > > > With
> > > > >> > > > > > > > > > >> > queued.max.requests, to prevent blocking on
> > the
> > > > >> > request
> > > > >> > > > > queue,
> > > > >> > > > > > > our
> > > > >> > > > > > > > > > >> > recommendation is to configure
> > > > queued.max.requests
> > > > >> to
> > > > >> > be
> > > > >> > > > the
> > > > >> > > > > > > same
> > > > >> > > > > > > > as
> > > > >> > > > > > > > > > the
> > > > >> > > > > > > > > > >> > number of socket connections on the broker.
> > > Since
> > > > >> the
> > > > >> > > > broker
> > > > >> > > > > > > never
> > > > >> > > > > > > > > > >> > processes more than 1 request per
> connection
> > > at a
> > > > >> > time,
> > > > >> > > > the
> > > > >> > > > > > > > request
> > > > >> > > > > > > > > > >> queue
> > > > >> > > > > > > > > > >> > will never be blocked. With
> queued.max.bytes,
> > > > it's
> > > > >> > going
> > > > >> > > > to
> > > > >> > > > > be
> > > > >> > > > > > > > > harder
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > >> > configure the value properly to prevent
> > > blocking.
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > So, while adding queued.max.bytes is
> > > potentially
> > > > >> > useful
> > > > >> > > > for
> > > > >> > > > > > > memory
> > > > >> > > > > > > > > > >> > management, for it to be truly useful, we
> > > > probably
> > > > >> > need
> > > > >> > > to
> > > > >> > > > > > > address
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > >> > processor blocking issue for it to be
> really
> > > > >> useful in
> > > > >> > > > > > practice.
> > > > >> > > > > > > > One
> > > > >> > > > > > > > > > >> > possibility is to put back-pressure to the
> > > client
> > > > >> when
> > > > >> > > the
> > > > >> > > > > > > request
> > > > >> > > > > > > > > > >> queue is
> > > > >> > > > > > > > > > >> > blocked. For example, if the processor
> > notices
> > > > that
> > > > >> > the
> > > > >> > > > > > request
> > > > >> > > > > > > > > queue
> > > > >> > > > > > > > > > is
> > > > >> > > > > > > > > > >> > full, it can turn off the interest bit for
> > read
> > > > for
> > > > >> > all
> > > > >> > > > > socket
> > > > >> > > > > > > > keys.
> > > > >> > > > > > > > > > >> This
> > > > >> > > > > > > > > > >> > will allow the processor to continue
> handling
> > > > >> > responses.
> > > > >> > > > > When
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > >> request
> > > > >> > > > > > > > > > >> > queue has space again, it can indicate the
> > new
> > > > >> state
> > > > >> > to
> > > > >> > > > the
> > > > >> > > > > > > > process
> > > > >> > > > > > > > > > and
> > > > >> > > > > > > > > > >> > wake up the selector. Not sure how this
> will
> > > work
> > > > >> with
> > > > >> > > > > > multiple
> > > > >> > > > > > > > > > >> processors
> > > > >> > > > > > > > > > >> > though since the request queue is shared
> > across
> > > > all
> > > > >> > > > > > processors.
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > Thanks,
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > Jun
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai <
> > > > >> > > > > > > > radai.rosenbl...@gmail.com>
> > > > >> > > > > > > > > > >> wrote:
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >> > > Hello,
> > > > >> > > > > > > > > > >> > >
> > > > >> > > > > > > > > > >> > > I'd like to initiate a discussion about
> > > > >> > > > > > > > > > >> > > https://cwiki.apache.org/
> > > > >> > > confluence/display/KAFKA/KIP-
> > > > >> > > > > > > > > > >> > > 72%3A+Allow+Sizing+Incoming+Re
> > > > >> quest+Queue+in+Bytes
> > > > >> > > > > > > > > > >> > >
> > > > >> > > > > > > > > > >> > > The goal of the KIP is to allow
> > configuring a
> > > > >> bound
> > > > >> > on
> > > > >> > > > the
> > > > >> > > > > > > > > capacity
> > > > >> > > > > > > > > > >> (as
> > > > >> > > > > > > > > > >> > in
> > > > >> > > > > > > > > > >> > > bytes of memory used) of the incoming
> > request
> > > > >> queue,
> > > > >> > > in
> > > > >> > > > > > > addition
> > > > >> > > > > > > > > to
> > > > >> > > > > > > > > > >> the
> > > > >> > > > > > > > > > >> > > current bound on the number of messages.
> > > > >> > > > > > > > > > >> > >
> > > > >> > > > > > > > > > >> > > This comes after several incidents at
> > > Linkedin
> > > > >> > where a
> > > > >> > > > > > sudden
> > > > >> > > > > > > > > > "spike"
> > > > >> > > > > > > > > > >> of
> > > > >> > > > > > > > > > >> > > large message batches caused an out of
> > memory
> > > > >> > > exception.
> > > > >> > > > > > > > > > >> > >
> > > > >> > > > > > > > > > >> > > Thank you,
> > > > >> > > > > > > > > > >> > >
> > > > >> > > > > > > > > > >> > >    Radai
> > > > >> > > > > > > > > > >> > >
> > > > >> > > > > > > > > > >> >
> > > > >> > > > > > > > > > >>
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > --
> > > > >> > > > > > > > > -Regards,
> > > > >> > > > > > > > > Mayuresh R. Gharat
> > > > >> > > > > > > > > (862) 250-7125
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Regards,
> > > > >>
> > > > >> Rajini
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to