Hi, Radai,

Sorry for the late response. How should the benchmark results be
interpreted? The higher the ops/s, the better? It would also be useful to
test this out on LinkedIn's traffic with enough socket connections to see
if there is any performance degradation.

Also, there is a separate proposal KIP-81 to bound the consumer memory
usage. Perhaps you can chime it there on whether this proposal can be
utilized there too.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer

Thanks,

Jun

On Tue, Sep 27, 2016 at 10:23 AM, radai <radai.rosenbl...@gmail.com> wrote:

> Hi Jun,
>
> 10 - mute/unmute functionality has been added in
> https://github.com/radai-rosenblatt/kafka/tree/broker-
> memory-pool-with-muting.
> I have yet to run stress tests to see how it behaves versus without muting
>
> 11 - I've added a SimplePool implementation (nothing more than an
> AtomicLong really) and compared it with my GC pool (that uses weak refs) -
> https://github.com/radai-rosenblatt/kafka-benchmarks/
> tree/master/memorypool-benchmarks.
> the results show no noticeable difference. what the results _do_ show
> though is that for large requests (1M) performance drops very sharply.
> since the SimplePool is essentially identical to current kafka code
> behaviour (the nechmark never reaches out of memory conditions) it would
> suggest to me that kafka performance for large request suffers greatly from
> the cost of allocating (and releasing) large buffers (instead of actually
> pooling them for later re-use). this means that an implementation of memory
> pool that actually pools ( :-) ) is very likely to improve broker
> performance for large requests.
>
> 12 - if there was a single thread iterating over selection keys then
> stopping at 1st unsatisfiable request might work (if iteration order over
> selection keys is deterministic, which is OS-dependent). however, kafka
> spawns multiple selectors sharing the same pool so i doubt the approach
> would gain anything. also notice that the current code already shuffles the
> selection keys if memory is low (<10%) to try and guarantee fairness.
>
> attached the benchmark results for the pool implementations:
>
> Benchmark                                        Mode  Cnt
> Score        Error  Units
> GarbageCollectedMemoryPoolBenchmark.alloc_100k  thrpt    5
> 198272.519 ±  16045.965  ops/s
> GarbageCollectedMemoryPoolBenchmark.alloc_10k   thrpt    5
> 2781439.307 ± 185287.072  ops/s
> GarbageCollectedMemoryPoolBenchmark.alloc_1k    thrpt    5
> 6029199.952 ± 465936.118  ops/s
> GarbageCollectedMemoryPoolBenchmark.alloc_1m    thrpt    5
> 18464.272 ±    332.861  ops/s
> SimpleMemoryPoolBenchmark.alloc_100k            thrpt    5
> 204240.066 ±   2207.619  ops/s
> SimpleMemoryPoolBenchmark.alloc_10k             thrpt    5
> 3000794.525 ±  83510.836  ops/s
> SimpleMemoryPoolBenchmark.alloc_1k              thrpt    5
> 5893671.778 ± 274239.541  ops/s
> SimpleMemoryPoolBenchmark.alloc_1m              thrpt    5
> 18728.085 ±    792.563  ops/s
>
>
>
> On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Radi,
> >
> > For 10, yes, we don't want the buffer pool to wake up the selector every
> > time some memory is freed up. We only want to do that when there is
> pending
> > requests to the buffer pool not honored due to not enough memory.
> >
> > For 11, we probably want to be a bit careful with Weak References. In
> > https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an
> > implementation based on Weak Reference, but abandoned it due to too much
> GC
> > overhead. It probably also makes the code a bit harder to understand. So,
> > perhaps it would be better if we can avoid it.
> >
> > For 12, that's a good point. I thought the selector will do some
> shuffling
> > for fairness. Perhaps we should stop allocating from the buffer pool when
> > we see the first key whose memory can't be honored?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Sat, Sep 24, 2016 at 8:44 AM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > Hi Jun,
> > >
> > > 10 - I'll add this functionality to the mute/unmute branch. as every
> > > mute/unmute operation is O(#connections / #selectorThreads) maybe a
> > > watermark approach is better than waking when _any_ mem is available?
> > >
> > > 11 - "gc notifications" are done by using a ReferenceQueue (
> > > https://docs.oracle.com/javase/8/docs/api/java/lang/
> > > ref/ReferenceQueue.html)
> > > in combination with weak references to allocated buffers. when a buffer
> > is
> > > reclaimed by the GC the corresponding weak ref to it is enqueued. the
> > pool
> > > maintains a set of outstanding buffer IDs (every allocated buffer gets
> a
> > > unique id - basically a sequence). a buffer explicitly returned has its
> > id
> > > removed from the tracking set and the weak reference to it destroyed,
> so
> > > its reference will never be enqueued by the GC even if it is GC'ed
> later.
> > > an enqueued reference (which indicates a buffer not returned to pool)
> > also
> > > carries the buffer id, which is then removed from the outstanding
> buffers
> > > set and the memory marked as available (and a warning printed). the
> pool
> > > has a background thread dedicated to reading references out of the
> queue
> > > (which under normal conditions remains blocked forever).
> > >
> > > 12 - the issue here is that a single "large" request (say 1MB) can get
> > > blocked indefinitely under a high pressure of much smaller requests
> (say
> > > 1KB) keeping memory utilization close to 100%. if we dont care about
> > > potential starvation the change is in a single condition. i'll make
> this
> > > configurable.
> > >
> > > 13 - I'll change the parameter name.
> > >
> > > On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Radai,
> > > >
> > > > Thanks for the updated KIP. A few more questions/comments below.
> > > >
> > > > 10. For "the mute/unmute happens just before poll(), which means as a
> > > worst
> > > > case there will be no reads for 300ms if memory was unavailable", I
> am
> > > > thinking that memory-pool could track if there is any pending request
> > and
> > > > wake up the selector when memory is released and there is a pending
> > > > request. This way, poll() doesn't have to wait for the timeout if
> > memory
> > > > frees up early.
> > > >
> > > > 11. For "to facilitate faster implementation (as a safety net) the
> pool
> > > > will be implemented in such a way that memory that was not
> release()ed
> > > (but
> > > > still garbage collected) would be detected and "reclaimed". this is
> to
> > > > prevent "leaks" in case of code paths that fail to release()
> > properly.",
> > > > could you explain a bit at the high level how this is done?
> > > >
> > > > 12. For "As the pool would allow any size request if it has any
> > capacity
> > > > available, the actual memory bound is queued.max.bytes +
> > > > socket.request.max.bytes.", it seems intuitively, the pool should
> only
> > > give
> > > > the Buffer back if it has enough available bytes. Then the request
> > memory
> > > > can be bounded by queued.max.bytes. We can validate that
> > queued.max.bytes
> > > > is at least socket.request.max.bytes.
> > > >
> > > > 13. For the naming, it seems request.queue.max.bytes is clearer than
> > > > queue.max.bytes.
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Thu, Sep 22, 2016 at 10:53 AM, radai <radai.rosenbl...@gmail.com>
> > > > wrote:
> > > >
> > > > > As discussed in the KIP call, I have updated the kip-72 page (
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+
> Incoming+requests)
> > > > > to record both configuration validations and implementation
> concerns.
> > > > > I've also implemented channel muting/unmuting in response to memory
> > > > > pressure. its available as a separate branch here -
> > > > > https://github.com/radai-rosenblatt/kafka/tree/broker-
> > > > > memory-pool-with-muting
> > > > > . the implementation without muting is here -
> > > > > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.
> > > > >
> > > > > the mute/unmute happens just before poll(), which means as a worst
> > case
> > > > > there will be no reads for 300ms if memory was unavailable (thats
> the
> > > > > timeout untill the next poll). perhaps a design with dedicated read
> > > > threads
> > > > > could do better (such a thread could actually block waiting for
> > > memory),
> > > > > but that would be a giant change.
> > > > >
> > > > > On Tue, Sep 13, 2016 at 9:20 AM, radai <radai.rosenbl...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > the specific memory pool implementation i wrote will allocate
> _any_
> > > > > amount
> > > > > > you request if it has _any_ memory available (so if it has 1 byte
> > > > > available
> > > > > > and you ask for 1MB you will get 1MB and the counter will go
> > > negative).
> > > > > > this was done to avoid issues with starvation of large requests.
> > > other
> > > > > > implementations may be more strict. to me this means that
> generally
> > > its
> > > > > not
> > > > > > a simple "have memory" vs "no memory" split (which gets worse
> > under a
> > > > > > hypothetical tiered pool scheme for QoS).
> > > > > >
> > > > > > to allow this flexibility in pool implementation i must preserve
> > the
> > > > > > amount of memory required. once read from the channel i cant put
> it
> > > > back,
> > > > > > so i store it?
> > > > > >
> > > > > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
> > > > > > rajinisiva...@googlemail.com> wrote:
> > > > > >
> > > > > >> Is there any value in allowing the 4-byte size to be read even
> > when
> > > > the
> > > > > >> request memory limit has been reached? If not, you can disable
> > > OP_READ
> > > > > >> interest for all channels that are ready inside Selector.poll()
> > when
> > > > > >> memory
> > > > > >> limit has been reached and re-enable before returning from
> poll().
> > > > > Perhaps
> > > > > >> a listener that is invoked when MemoryPool moves from
> unavailable
> > to
> > > > > >> available state can wakeup the selector. The changes for this
> > should
> > > > be
> > > > > >> fairly contained without any additional channel state. And it
> > would
> > > > > avoid
> > > > > >> the overhead of polls that return immediately even when progress
> > > > cannot
> > > > > be
> > > > > >> made because memory limit has been reached.
> > > > > >>
> > > > > >> On Tue, Sep 13, 2016 at 12:31 AM, radai <
> > radai.rosenbl...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Jun,
> > > > > >> >
> > > > > >> > Yes, youre right - right now the next select() call will
> return
> > > > > >> immediately
> > > > > >> > with the same set of keys as earlier (at least) as they were
> not
> > > > > >> previously
> > > > > >> > handled (no memory).
> > > > > >> > My assumption is that this happens under considerable load -
> > > > something
> > > > > >> has
> > > > > >> > to be occupying all this memory. also, this happens in the
> > context
> > > > of
> > > > > >> > SocketServer.Processor.run():
> > > > > >> >
> > > > > >> > while (isRunning) {
> > > > > >> >    configureNewConnections()
> > > > > >> >    processNewResponses()
> > > > > >> >    poll()   <------ HERE
> > > > > >> >    processCompletedReceives()
> > > > > >> >    processCompletedSends()
> > > > > >> >    processDisconnected()
> > > > > >> > }
> > > > > >> >
> > > > > >> > even within poll(), things like finishConnection(), prepare(),
> > and
> > > > > >> write()s
> > > > > >> > can still make progress under low memory conditions. and given
> > the
> > > > > load,
> > > > > >> > there's probably progress to be made in
> > > processCompletedReceives(),
> > > > > >> > processCompletedSends() and processDisconnected().
> > > > > >> >
> > > > > >> > if there's progress to be made in other things its likely that
> > the
> > > > > next
> > > > > >> > call to poll() will not happen immediately and so the loop
> wont
> > be
> > > > > that
> > > > > >> > tight. in order for this to devolve into true busy waiting you
> > > would
> > > > > >> need a
> > > > > >> > situation where no progress can be made on any in-progress
> > > requests
> > > > > and
> > > > > >> no
> > > > > >> > responses to send out ?
> > > > > >> >
> > > > > >> > if my assumption does not hold then you are correct, and
> > > > > >> selector.poll(300)
> > > > > >> > currently hardcoded in SocketServer.Processor.poll() would
> need
> > to
> > > > be
> > > > > >> > replaced with something more complicated. my biggest point of
> > > > concern
> > > > > >> > though is that the resulting code would be complicated and
> would
> > > > > couple
> > > > > >> > Selector to the memory pool very tightly. undey my current
> patch
> > > > > >> Selector
> > > > > >> > needs the memory pool only to pass to channels when they are
> > > built.
> > > > > this
> > > > > >> > would allow different memory pools relatively easily for
> things
> > > like
> > > > > >> > reserving memory for cross-broker replication and high-SLA
> > > > > connections.
> > > > > >> a
> > > > > >> > tighter coupling would make any such future modification hard.
> > > > > >> >
> > > > > >> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > > >> >
> > > > > >> > > Hi, Radai,
> > > > > >> > >
> > > > > >> > > Thanks for the reply. I still have a followup question on
> #2.
> > > > > >> > >
> > > > > >> > > My understanding is that in your proposal, selector will now
> > > first
> > > > > >> read
> > > > > >> > the
> > > > > >> > > size of the Receive. If there is not enough memory, it has
> to
> > > turn
> > > > > off
> > > > > >> > the
> > > > > >> > > READ interest bit for the corresponding KafkaChannel.
> > Otherwise,
> > > > > >> > subsequent
> > > > > >> > > selector.poll() call will always return immediately, adding
> > > > > >> unnecessary
> > > > > >> > > overhead. If you do that, the  Selector will need to know
> when
> > > to
> > > > > >> turn on
> > > > > >> > > the READ interest bit again. It may not be enough to do this
> > > check
> > > > > >> until
> > > > > >> > > the next poll call since the timeout used by poll() could be
> > > > > >> arbitrarily
> > > > > >> > > large. So, it seems that some kind of coordination between
> the
> > > > > >> Selector
> > > > > >> > and
> > > > > >> > > the bufferpool is needed?
> > > > > >> > >
> > > > > >> > > Jun
> > > > > >> > >
> > > > > >> > > On Thu, Sep 8, 2016 at 7:02 PM, radai <
> > > radai.rosenbl...@gmail.com
> > > > >
> > > > > >> > wrote:
> > > > > >> > >
> > > > > >> > > > Hi Jun,
> > > > > >> > > >
> > > > > >> > > > 1. yes, it is my own personal opinion that people use
> > > > > >> > queued.max.requests
> > > > > >> > > > as an indirect way to bound memory consumption. once a
> more
> > > > direct
> > > > > >> > memory
> > > > > >> > > > bound mechanism exists (and works) i dont think
> > > > > queued.max.requests
> > > > > >> > woul
> > > > > >> > > > dbe required. having said that I was not planning on
> making
> > > any
> > > > > >> changes
> > > > > >> > > > w.r.t queued.max.requests support (so I was aiming to get
> > to a
> > > > > >> > situation
> > > > > >> > > > where both configs are supported) to allow gathering
> enough
> > > > > >> > > data/feedback.
> > > > > >> > > >
> > > > > >> > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe
> > > get a
> > > > > >> > > > NetworkReceive. multiple such read() calls may be required
> > > > until a
> > > > > >> > > Receive
> > > > > >> > > > is produced already in the current code base. my pool
> > > > > >> implementation is
> > > > > >> > > > non-blocking so if there's no memory available the read()
> > call
> > > > > will
> > > > > >> > > return
> > > > > >> > > > null. poll() would then move on to try and service other
> > > > selection
> > > > > >> > keys.
> > > > > >> > > > the pool will be checked for available memory again the
> next
> > > > time
> > > > > >> the
> > > > > >> > > > SocketServer.run() loop gets to poll(). and so right now I
> > > dont
> > > > > >> > > communicate
> > > > > >> > > > memory becoming available to the selector - it will just
> go
> > on
> > > > to
> > > > > >> try
> > > > > >> > and
> > > > > >> > > > make progress elsewhere and come back again. i never block
> > it
> > > or
> > > > > >> send
> > > > > >> > it
> > > > > >> > > to
> > > > > >> > > > sleep. I think for efficiency what could maybe be done is
> if
> > > > > there's
> > > > > >> > not
> > > > > >> > > > enough memory to service a readable selection key we may
> > want
> > > to
> > > > > >> skip
> > > > > >> > all
> > > > > >> > > > other read-ready selection keys for that iteration of
> > > > > >> > > pollSelectionKeys().
> > > > > >> > > > that would require rather invasive changes around
> > > > > >> > > > Selector.pollSelectionKeys() that I'd rather avoid. also
> > > > different
> > > > > >> > > > KafkaChannels may be backed by different memory pool
> (under
> > > some
> > > > > >> sort
> > > > > >> > of
> > > > > >> > > > future QoS scheme?), which would complicate such an
> > > optimization
> > > > > >> > further.
> > > > > >> > > >
> > > > > >> > > > 3. i added the pool interface and implementation under
> > > > > >> > > kafka.common.memory,
> > > > > >> > > > and the API is "thin" enough to be generally useful
> > (currently
> > > > its
> > > > > >> > > > non-blocking only, but a get(long maxWait) is definitely
> > > > doable).
> > > > > >> > having
> > > > > >> > > > said that, I'm not really familiar enough with the code to
> > > > say....
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao <j...@confluent.io
> >
> > > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hi, Radi,
> > > > > >> > > > >
> > > > > >> > > > > Thanks for the update. At the high level, this looks
> > > > promising.
> > > > > A
> > > > > >> few
> > > > > >> > > > > comments below.
> > > > > >> > > > >
> > > > > >> > > > > 1. If we can bound the requests by bytes, it seems that
> we
> > > > don't
> > > > > >> need
> > > > > >> > > > > queued.max.requests
> > > > > >> > > > > any more? Could we just deprecate the config and make
> the
> > > > queue
> > > > > >> size
> > > > > >> > > > > unbounded?
> > > > > >> > > > > 2. How do we communicate back to the selector when some
> > > memory
> > > > > is
> > > > > >> > freed
> > > > > >> > > > up?
> > > > > >> > > > > We probably need to wake up the selector. For
> efficiency,
> > > > > perhaps
> > > > > >> we
> > > > > >> > > only
> > > > > >> > > > > need to wake up the selector if the bufferpool is full?
> > > > > >> > > > > 3. We talked about bounding the consumer's memory
> before.
> > To
> > > > > fully
> > > > > >> > > > support
> > > > > >> > > > > that, we will need to bound the memory used by different
> > > fetch
> > > > > >> > > responses
> > > > > >> > > > in
> > > > > >> > > > > the consumer. Do you think the changes that you propose
> > here
> > > > can
> > > > > >> be
> > > > > >> > > > > leveraged to bound the memory in the consumer as well?
> > > > > >> > > > >
> > > > > >> > > > > Jun
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai <
> > > > > >> radai.rosenbl...@gmail.com>
> > > > > >> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > My apologies for the delay in response.
> > > > > >> > > > > >
> > > > > >> > > > > > I agree with the concerns about OOM reading from the
> > > actual
> > > > > >> sockets
> > > > > >> > > and
> > > > > >> > > > > > blocking the network threads - messing with the
> request
> > > > queue
> > > > > >> > itself
> > > > > >> > > > > would
> > > > > >> > > > > > not do.
> > > > > >> > > > > >
> > > > > >> > > > > > I propose instead a memory pool approach - the broker
> > > would
> > > > > >> have a
> > > > > >> > > non
> > > > > >> > > > > > blocking memory pool. upon reading the first 4 bytes
> out
> > > of
> > > > a
> > > > > >> > socket
> > > > > >> > > an
> > > > > >> > > > > > attempt would be made to acquire enough memory and if
> > that
> > > > > >> attempt
> > > > > >> > > > fails
> > > > > >> > > > > > the processing thread will move on to try and make
> > > progress
> > > > > with
> > > > > >> > > other
> > > > > >> > > > > > tasks.
> > > > > >> > > > > >
> > > > > >> > > > > > I think Its simpler than mute/unmute because using
> > > > mute/unmute
> > > > > >> > would
> > > > > >> > > > > > require differentiating between sockets muted due to a
> > > > request
> > > > > >> in
> > > > > >> > > > > progress
> > > > > >> > > > > > (normal current operation) and sockets muted due to
> lack
> > > of
> > > > > >> memory.
> > > > > >> > > > > sockets
> > > > > >> > > > > > of the 1st kind would be unmuted at the end of request
> > > > > >> processing
> > > > > >> > (as
> > > > > >> > > > it
> > > > > >> > > > > > happens right now) but the 2nd kind would require some
> > > sort
> > > > of
> > > > > >> > > "unmute
> > > > > >> > > > > > watchdog" which is (i claim) more complicated than a
> > > memory
> > > > > >> pool.
> > > > > >> > > also
> > > > > >> > > > a
> > > > > >> > > > > > memory pool is a more generic solution.
> > > > > >> > > > > >
> > > > > >> > > > > > I've updated the KIP page (
> > > > > >> > > > > > https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> > > > > >> > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+
> > > > > >> > Incoming+requests)
> > > > > >> > > > > > to reflect the new proposed implementation, and i've
> > also
> > > > put
> > > > > >> up an
> > > > > >> > > > > inital
> > > > > >> > > > > > implementation proposal on github -
> > > > > >> > > > > > https://github.com/radai-rosenblatt/kafka/commits/
> > > > > >> > broker-memory-pool
> > > > > >> > > .
> > > > > >> > > > > the
> > > > > >> > > > > > proposed code is not complete and tested yet (so
> > probably
> > > > > buggy)
> > > > > >> > but
> > > > > >> > > > does
> > > > > >> > > > > > include the main points of modification.
> > > > > >> > > > > >
> > > > > >> > > > > > the specific implementation of the pool on that branch
> > > also
> > > > > has
> > > > > >> a
> > > > > >> > > built
> > > > > >> > > > > in
> > > > > >> > > > > > safety net where memory that is acquired but not
> > released
> > > > > (which
> > > > > >> > is a
> > > > > >> > > > > bug)
> > > > > >> > > > > > is discovered when the garbage collector frees it and
> > the
> > > > > >> capacity
> > > > > >> > is
> > > > > >> > > > > > reclaimed.
> > > > > >> > > > > >
> > > > > >> > > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao <
> > j...@confluent.io
> > > >
> > > > > >> wrote:
> > > > > >> > > > > >
> > > > > >> > > > > > > Radi,
> > > > > >> > > > > > >
> > > > > >> > > > > > > Yes, I got the benefit of bounding the request queue
> > by
> > > > > >> bytes. My
> > > > > >> > > > > concern
> > > > > >> > > > > > > is the following if we don't change the behavior of
> > > > > processor
> > > > > >> > > > blocking
> > > > > >> > > > > on
> > > > > >> > > > > > > queue full.
> > > > > >> > > > > > >
> > > > > >> > > > > > > If the broker truly doesn't have enough memory for
> > > > buffering
> > > > > >> > > > > outstanding
> > > > > >> > > > > > > requests from all connections, we have to either hit
> > OOM
> > > > or
> > > > > >> block
> > > > > >> > > the
> > > > > >> > > > > > > processor. Both will be bad. I am not sure if one is
> > > > clearly
> > > > > >> > better
> > > > > >> > > > > than
> > > > > >> > > > > > > the other. In this case, the solution is probably to
> > > > expand
> > > > > >> the
> > > > > >> > > > cluster
> > > > > >> > > > > > to
> > > > > >> > > > > > > reduce the per broker request load.
> > > > > >> > > > > > >
> > > > > >> > > > > > > If the broker actually has enough memory, we want to
> > be
> > > > able
> > > > > >> to
> > > > > >> > > > > configure
> > > > > >> > > > > > > the request queue in such a way that it never
> blocks.
> > > You
> > > > > can
> > > > > >> > tell
> > > > > >> > > > > people
> > > > > >> > > > > > > to just set the request queue to be unbounded, which
> > may
> > > > > scare
> > > > > >> > > them.
> > > > > >> > > > If
> > > > > >> > > > > > we
> > > > > >> > > > > > > do want to put a bound, it seems it's easier to
> > > configure
> > > > > the
> > > > > >> > queue
> > > > > >> > > > > size
> > > > > >> > > > > > > based on # requests. Basically, we can tell people
> to
> > > set
> > > > > the
> > > > > >> > queue
> > > > > >> > > > > size
> > > > > >> > > > > > > based on number of connections. If the queue is
> based
> > on
> > > > > >> bytes,
> > > > > >> > > it's
> > > > > >> > > > > not
> > > > > >> > > > > > > clear how people should set it w/o causing the
> > processor
> > > > to
> > > > > >> > block.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Finally, Rajini has a good point. The ByteBuffer in
> > the
> > > > > >> request
> > > > > >> > > > object
> > > > > >> > > > > is
> > > > > >> > > > > > > allocated as soon as we see the first 4 bytes from
> the
> > > > > socket.
> > > > > >> > So,
> > > > > >> > > I
> > > > > >> > > > am
> > > > > >> > > > > > not
> > > > > >> > > > > > > sure if just bounding the request queue itself is
> > enough
> > > > to
> > > > > >> bound
> > > > > >> > > the
> > > > > >> > > > > > > memory related to requests.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Thanks,
> > > > > >> > > > > > >
> > > > > >> > > > > > > Jun
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai <
> > > > > >> > radai.rosenbl...@gmail.com>
> > > > > >> > > > > > wrote:
> > > > > >> > > > > > >
> > > > > >> > > > > > > > I agree that filling up the request queue can
> cause
> > > > > clients
> > > > > >> to
> > > > > >> > > time
> > > > > >> > > > > out
> > > > > >> > > > > > > > (and presumably retry?). However, for the
> workloads
> > > > where
> > > > > we
> > > > > >> > > expect
> > > > > >> > > > > > this
> > > > > >> > > > > > > > configuration to be useful the alternative is
> > > currently
> > > > an
> > > > > >> OOM
> > > > > >> > > > crash.
> > > > > >> > > > > > > > In my opinion an initial implementation of this
> > > feature
> > > > > >> could
> > > > > >> > be
> > > > > >> > > > > > > > constrained to a simple drop-in replacement of
> > > > > >> > ArrayBlockingQueue
> > > > > >> > > > > > > > (conditional, opt-in) and further study of
> behavior
> > > > > patterns
> > > > > >> > > under
> > > > > >> > > > > load
> > > > > >> > > > > > > can
> > > > > >> > > > > > > > drive future changes to the API later when those
> > > > behaviors
> > > > > >> are
> > > > > >> > > > better
> > > > > >> > > > > > > > understood (like back-pressure, nop filler
> responses
> > > to
> > > > > >> avoid
> > > > > >> > > > client
> > > > > >> > > > > > > > timeouts or whatever).
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> > > > > >> > > > > > > > gharatmayures...@gmail.com>
> > > > > >> > > > > > > > wrote:
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > > Nice write up Radai.
> > > > > >> > > > > > > > > I think what Jun said is a valid concern.
> > > > > >> > > > > > > > > If I am not wrong as per the proposal, we are
> > > > depending
> > > > > on
> > > > > >> > the
> > > > > >> > > > > entire
> > > > > >> > > > > > > > > pipeline to flow smoothly from accepting
> requests
> > to
> > > > > >> handling
> > > > > >> > > it,
> > > > > >> > > > > > > calling
> > > > > >> > > > > > > > > KafkaApis and handing back the responses.
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > Thanks,
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > Mayuresh
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy <
> > > > > >> > > jjkosh...@gmail.com
> > > > > >> > > > >
> > > > > >> > > > > > > wrote:
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > > >
> > > > > >> > > > > > > > > > > .
> > > > > >> > > > > > > > > > >>
> > > > > >> > > > > > > > > > >>
> > > > > >> > > > > > > > > > > Hi Becket,
> > > > > >> > > > > > > > > > >
> > > > > >> > > > > > > > > > > I don't think progress can be made in the
> > > > > processor's
> > > > > >> run
> > > > > >> > > > loop
> > > > > >> > > > > if
> > > > > >> > > > > > > the
> > > > > >> > > > > > > > > > > queue fills up. i.e., I think Jun's point is
> > > that
> > > > if
> > > > > >> the
> > > > > >> > > > queue
> > > > > >> > > > > is
> > > > > >> > > > > > > > full
> > > > > >> > > > > > > > > > > (either due to the proposed max.bytes or
> today
> > > due
> > > > > to
> > > > > >> > > > > > max.requests
> > > > > >> > > > > > > > > > hitting
> > > > > >> > > > > > > > > > > the limit) then processCompletedReceives
> will
> > > > block
> > > > > >> and
> > > > > >> > no
> > > > > >> > > > > > further
> > > > > >> > > > > > > > > > progress
> > > > > >> > > > > > > > > > > can be made.
> > > > > >> > > > > > > > > > >
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > I'm sorry - this isn't right. There will be
> > > progress
> > > > > as
> > > > > >> > long
> > > > > >> > > as
> > > > > >> > > > > the
> > > > > >> > > > > > > API
> > > > > >> > > > > > > > > > handlers are able to pick requests off the
> > request
> > > > > queue
> > > > > >> > and
> > > > > >> > > > add
> > > > > >> > > > > > the
> > > > > >> > > > > > > > > > responses to the response queues (which are
> > > > > effectively
> > > > > >> > > > > unbounded).
> > > > > >> > > > > > > > > > However, the point is valid that blocking in
> the
> > > > > request
> > > > > >> > > > > channel's
> > > > > >> > > > > > > put
> > > > > >> > > > > > > > > has
> > > > > >> > > > > > > > > > the effect of exacerbating the pressure on the
> > > > socket
> > > > > >> > server.
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > >
> > > > > >> > > > > > > > > > >>
> > > > > >> > > > > > > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao <
> > > > > >> > > j...@confluent.io>
> > > > > >> > > > > > > wrote:
> > > > > >> > > > > > > > > > >>
> > > > > >> > > > > > > > > > >> > Radai,
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > Thanks for the proposal. A couple of
> > comments
> > > > on
> > > > > >> this.
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > 1. Since we store request objects in the
> > > > request
> > > > > >> > queue,
> > > > > >> > > > how
> > > > > >> > > > > do
> > > > > >> > > > > > > we
> > > > > >> > > > > > > > > get
> > > > > >> > > > > > > > > > an
> > > > > >> > > > > > > > > > >> > accurate size estimate for those
> requests?
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > 2. Currently, it's bad if the processor
> > > blocks
> > > > on
> > > > > >> > > adding a
> > > > > >> > > > > > > request
> > > > > >> > > > > > > > > to
> > > > > >> > > > > > > > > > >> the
> > > > > >> > > > > > > > > > >> > request queue. Once blocked, the
> processor
> > > > can't
> > > > > >> > process
> > > > > >> > > > the
> > > > > >> > > > > > > > sending
> > > > > >> > > > > > > > > > of
> > > > > >> > > > > > > > > > >> > responses of other socket keys either.
> This
> > > > will
> > > > > >> cause
> > > > > >> > > all
> > > > > >> > > > > > > clients
> > > > > >> > > > > > > > > in
> > > > > >> > > > > > > > > > >> this
> > > > > >> > > > > > > > > > >> > processor with an outstanding request to
> > > > > eventually
> > > > > >> > > > timeout.
> > > > > >> > > > > > > > > > Typically,
> > > > > >> > > > > > > > > > >> > this will trigger client-side retries,
> > which
> > > > will
> > > > > >> add
> > > > > >> > > more
> > > > > >> > > > > > load
> > > > > >> > > > > > > on
> > > > > >> > > > > > > > > the
> > > > > >> > > > > > > > > > >> > broker and cause potentially more
> > congestion
> > > in
> > > > > the
> > > > > >> > > > request
> > > > > >> > > > > > > queue.
> > > > > >> > > > > > > > > > With
> > > > > >> > > > > > > > > > >> > queued.max.requests, to prevent blocking
> on
> > > the
> > > > > >> > request
> > > > > >> > > > > queue,
> > > > > >> > > > > > > our
> > > > > >> > > > > > > > > > >> > recommendation is to configure
> > > > > queued.max.requests
> > > > > >> to
> > > > > >> > be
> > > > > >> > > > the
> > > > > >> > > > > > > same
> > > > > >> > > > > > > > as
> > > > > >> > > > > > > > > > the
> > > > > >> > > > > > > > > > >> > number of socket connections on the
> broker.
> > > > Since
> > > > > >> the
> > > > > >> > > > broker
> > > > > >> > > > > > > never
> > > > > >> > > > > > > > > > >> > processes more than 1 request per
> > connection
> > > > at a
> > > > > >> > time,
> > > > > >> > > > the
> > > > > >> > > > > > > > request
> > > > > >> > > > > > > > > > >> queue
> > > > > >> > > > > > > > > > >> > will never be blocked. With
> > queued.max.bytes,
> > > > > it's
> > > > > >> > going
> > > > > >> > > > to
> > > > > >> > > > > be
> > > > > >> > > > > > > > > harder
> > > > > >> > > > > > > > > > to
> > > > > >> > > > > > > > > > >> > configure the value properly to prevent
> > > > blocking.
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > So, while adding queued.max.bytes is
> > > > potentially
> > > > > >> > useful
> > > > > >> > > > for
> > > > > >> > > > > > > memory
> > > > > >> > > > > > > > > > >> > management, for it to be truly useful, we
> > > > > probably
> > > > > >> > need
> > > > > >> > > to
> > > > > >> > > > > > > address
> > > > > >> > > > > > > > > the
> > > > > >> > > > > > > > > > >> > processor blocking issue for it to be
> > really
> > > > > >> useful in
> > > > > >> > > > > > practice.
> > > > > >> > > > > > > > One
> > > > > >> > > > > > > > > > >> > possibility is to put back-pressure to
> the
> > > > client
> > > > > >> when
> > > > > >> > > the
> > > > > >> > > > > > > request
> > > > > >> > > > > > > > > > >> queue is
> > > > > >> > > > > > > > > > >> > blocked. For example, if the processor
> > > notices
> > > > > that
> > > > > >> > the
> > > > > >> > > > > > request
> > > > > >> > > > > > > > > queue
> > > > > >> > > > > > > > > > is
> > > > > >> > > > > > > > > > >> > full, it can turn off the interest bit
> for
> > > read
> > > > > for
> > > > > >> > all
> > > > > >> > > > > socket
> > > > > >> > > > > > > > keys.
> > > > > >> > > > > > > > > > >> This
> > > > > >> > > > > > > > > > >> > will allow the processor to continue
> > handling
> > > > > >> > responses.
> > > > > >> > > > > When
> > > > > >> > > > > > > the
> > > > > >> > > > > > > > > > >> request
> > > > > >> > > > > > > > > > >> > queue has space again, it can indicate
> the
> > > new
> > > > > >> state
> > > > > >> > to
> > > > > >> > > > the
> > > > > >> > > > > > > > process
> > > > > >> > > > > > > > > > and
> > > > > >> > > > > > > > > > >> > wake up the selector. Not sure how this
> > will
> > > > work
> > > > > >> with
> > > > > >> > > > > > multiple
> > > > > >> > > > > > > > > > >> processors
> > > > > >> > > > > > > > > > >> > though since the request queue is shared
> > > across
> > > > > all
> > > > > >> > > > > > processors.
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > Thanks,
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > Jun
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai <
> > > > > >> > > > > > > > radai.rosenbl...@gmail.com>
> > > > > >> > > > > > > > > > >> wrote:
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >> > > Hello,
> > > > > >> > > > > > > > > > >> > >
> > > > > >> > > > > > > > > > >> > > I'd like to initiate a discussion about
> > > > > >> > > > > > > > > > >> > > https://cwiki.apache.org/
> > > > > >> > > confluence/display/KAFKA/KIP-
> > > > > >> > > > > > > > > > >> > > 72%3A+Allow+Sizing+Incoming+Re
> > > > > >> quest+Queue+in+Bytes
> > > > > >> > > > > > > > > > >> > >
> > > > > >> > > > > > > > > > >> > > The goal of the KIP is to allow
> > > configuring a
> > > > > >> bound
> > > > > >> > on
> > > > > >> > > > the
> > > > > >> > > > > > > > > capacity
> > > > > >> > > > > > > > > > >> (as
> > > > > >> > > > > > > > > > >> > in
> > > > > >> > > > > > > > > > >> > > bytes of memory used) of the incoming
> > > request
> > > > > >> queue,
> > > > > >> > > in
> > > > > >> > > > > > > addition
> > > > > >> > > > > > > > > to
> > > > > >> > > > > > > > > > >> the
> > > > > >> > > > > > > > > > >> > > current bound on the number of
> messages.
> > > > > >> > > > > > > > > > >> > >
> > > > > >> > > > > > > > > > >> > > This comes after several incidents at
> > > > Linkedin
> > > > > >> > where a
> > > > > >> > > > > > sudden
> > > > > >> > > > > > > > > > "spike"
> > > > > >> > > > > > > > > > >> of
> > > > > >> > > > > > > > > > >> > > large message batches caused an out of
> > > memory
> > > > > >> > > exception.
> > > > > >> > > > > > > > > > >> > >
> > > > > >> > > > > > > > > > >> > > Thank you,
> > > > > >> > > > > > > > > > >> > >
> > > > > >> > > > > > > > > > >> > >    Radai
> > > > > >> > > > > > > > > > >> > >
> > > > > >> > > > > > > > > > >> >
> > > > > >> > > > > > > > > > >>
> > > > > >> > > > > > > > > > >
> > > > > >> > > > > > > > > > >
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > --
> > > > > >> > > > > > > > > -Regards,
> > > > > >> > > > > > > > > Mayuresh R. Gharat
> > > > > >> > > > > > > > > (862) 250-7125
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Regards,
> > > > > >>
> > > > > >> Rajini
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to