Hi, Radi,

Thanks for the update. At the high level, this looks promising. A few
comments below.

1. If we can bound the requests by bytes, it seems that we don't need
queued.max.requests
any more? Could we just deprecate the config and make the queue size
unbounded?
2. How do we communicate back to the selector when some memory is freed up?
We probably need to wake up the selector. For efficiency, perhaps we only
need to wake up the selector if the bufferpool is full?
3. We talked about bounding the consumer's memory before. To fully support
that, we will need to bound the memory used by different fetch responses in
the consumer. Do you think the changes that you propose here can be
leveraged to bound the memory in the consumer as well?

Jun


On Tue, Aug 30, 2016 at 10:41 AM, radai <radai.rosenbl...@gmail.com> wrote:

> My apologies for the delay in response.
>
> I agree with the concerns about OOM reading from the actual sockets and
> blocking the network threads - messing with the request queue itself would
> not do.
>
> I propose instead a memory pool approach - the broker would have a non
> blocking memory pool. upon reading the first 4 bytes out of a socket an
> attempt would be made to acquire enough memory and if that attempt fails
> the processing thread will move on to try and make progress with other
> tasks.
>
> I think Its simpler than mute/unmute because using mute/unmute would
> require differentiating between sockets muted due to a request in progress
> (normal current operation) and sockets muted due to lack of memory. sockets
> of the 1st kind would be unmuted at the end of request processing (as it
> happens right now) but the 2nd kind would require some sort of "unmute
> watchdog" which is (i claim) more complicated than a memory pool. also a
> memory pool is a more generic solution.
>
> I've updated the KIP page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> to reflect the new proposed implementation, and i've also put up an inital
> implementation proposal on github -
> https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool. the
> proposed code is not complete and tested yet (so probably buggy) but does
> include the main points of modification.
>
> the specific implementation of the pool on that branch also has a built in
> safety net where memory that is acquired but not released (which is a bug)
> is discovered when the garbage collector frees it and the capacity is
> reclaimed.
>
> On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Radi,
> >
> > Yes, I got the benefit of bounding the request queue by bytes. My concern
> > is the following if we don't change the behavior of processor blocking on
> > queue full.
> >
> > If the broker truly doesn't have enough memory for buffering outstanding
> > requests from all connections, we have to either hit OOM or block the
> > processor. Both will be bad. I am not sure if one is clearly better than
> > the other. In this case, the solution is probably to expand the cluster
> to
> > reduce the per broker request load.
> >
> > If the broker actually has enough memory, we want to be able to configure
> > the request queue in such a way that it never blocks. You can tell people
> > to just set the request queue to be unbounded, which may scare them. If
> we
> > do want to put a bound, it seems it's easier to configure the queue size
> > based on # requests. Basically, we can tell people to set the queue size
> > based on number of connections. If the queue is based on bytes, it's not
> > clear how people should set it w/o causing the processor to block.
> >
> > Finally, Rajini has a good point. The ByteBuffer in the request object is
> > allocated as soon as we see the first 4 bytes from the socket. So, I am
> not
> > sure if just bounding the request queue itself is enough to bound the
> > memory related to requests.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Aug 8, 2016 at 4:46 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > I agree that filling up the request queue can cause clients to time out
> > > (and presumably retry?). However, for the workloads where we expect
> this
> > > configuration to be useful the alternative is currently an OOM crash.
> > > In my opinion an initial implementation of this feature could be
> > > constrained to a simple drop-in replacement of ArrayBlockingQueue
> > > (conditional, opt-in) and further study of behavior patterns under load
> > can
> > > drive future changes to the API later when those behaviors are better
> > > understood (like back-pressure, nop filler responses to avoid client
> > > timeouts or whatever).
> > >
> > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Nice write up Radai.
> > > > I think what Jun said is a valid concern.
> > > > If I am not wrong as per the proposal, we are depending on the entire
> > > > pipeline to flow smoothly from accepting requests to handling it,
> > calling
> > > > KafkaApis and handing back the responses.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > >
> > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy <jjkosh...@gmail.com>
> > wrote:
> > > >
> > > > > >
> > > > > > .
> > > > > >>
> > > > > >>
> > > > > > Hi Becket,
> > > > > >
> > > > > > I don't think progress can be made in the processor's run loop if
> > the
> > > > > > queue fills up. i.e., I think Jun's point is that if the queue is
> > > full
> > > > > > (either due to the proposed max.bytes or today due to
> max.requests
> > > > > hitting
> > > > > > the limit) then processCompletedReceives will block and no
> further
> > > > > progress
> > > > > > can be made.
> > > > > >
> > > > >
> > > > > I'm sorry - this isn't right. There will be progress as long as the
> > API
> > > > > handlers are able to pick requests off the request queue and add
> the
> > > > > responses to the response queues (which are effectively unbounded).
> > > > > However, the point is valid that blocking in the request channel's
> > put
> > > > has
> > > > > the effect of exacerbating the pressure on the socket server.
> > > > >
> > > > >
> > > > > >
> > > > > >>
> > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > > >>
> > > > > >> > Radai,
> > > > > >> >
> > > > > >> > Thanks for the proposal. A couple of comments on this.
> > > > > >> >
> > > > > >> > 1. Since we store request objects in the request queue, how do
> > we
> > > > get
> > > > > an
> > > > > >> > accurate size estimate for those requests?
> > > > > >> >
> > > > > >> > 2. Currently, it's bad if the processor blocks on adding a
> > request
> > > > to
> > > > > >> the
> > > > > >> > request queue. Once blocked, the processor can't process the
> > > sending
> > > > > of
> > > > > >> > responses of other socket keys either. This will cause all
> > clients
> > > > in
> > > > > >> this
> > > > > >> > processor with an outstanding request to eventually timeout.
> > > > > Typically,
> > > > > >> > this will trigger client-side retries, which will add more
> load
> > on
> > > > the
> > > > > >> > broker and cause potentially more congestion in the request
> > queue.
> > > > > With
> > > > > >> > queued.max.requests, to prevent blocking on the request queue,
> > our
> > > > > >> > recommendation is to configure queued.max.requests to be the
> > same
> > > as
> > > > > the
> > > > > >> > number of socket connections on the broker. Since the broker
> > never
> > > > > >> > processes more than 1 request per connection at a time, the
> > > request
> > > > > >> queue
> > > > > >> > will never be blocked. With queued.max.bytes, it's going to be
> > > > harder
> > > > > to
> > > > > >> > configure the value properly to prevent blocking.
> > > > > >> >
> > > > > >> > So, while adding queued.max.bytes is potentially useful for
> > memory
> > > > > >> > management, for it to be truly useful, we probably need to
> > address
> > > > the
> > > > > >> > processor blocking issue for it to be really useful in
> practice.
> > > One
> > > > > >> > possibility is to put back-pressure to the client when the
> > request
> > > > > >> queue is
> > > > > >> > blocked. For example, if the processor notices that the
> request
> > > > queue
> > > > > is
> > > > > >> > full, it can turn off the interest bit for read for all socket
> > > keys.
> > > > > >> This
> > > > > >> > will allow the processor to continue handling responses. When
> > the
> > > > > >> request
> > > > > >> > queue has space again, it can indicate the new state to the
> > > process
> > > > > and
> > > > > >> > wake up the selector. Not sure how this will work with
> multiple
> > > > > >> processors
> > > > > >> > though since the request queue is shared across all
> processors.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> >
> > > > > >> > Jun
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai <
> > > radai.rosenbl...@gmail.com>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Hello,
> > > > > >> > >
> > > > > >> > > I'd like to initiate a discussion about
> > > > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> > > > > >> > >
> > > > > >> > > The goal of the KIP is to allow configuring a bound on the
> > > > capacity
> > > > > >> (as
> > > > > >> > in
> > > > > >> > > bytes of memory used) of the incoming request queue, in
> > addition
> > > > to
> > > > > >> the
> > > > > >> > > current bound on the number of messages.
> > > > > >> > >
> > > > > >> > > This comes after several incidents at Linkedin where a
> sudden
> > > > > "spike"
> > > > > >> of
> > > > > >> > > large message batches caused an out of memory exception.
> > > > > >> > >
> > > > > >> > > Thank you,
> > > > > >> > >
> > > > > >> > >    Radai
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> >
>

Reply via email to