Yeah we can definitely do better in documentation. While regarding the API
changes I would prefer to hold and think through if such use cases are
common in pattern, and that if we can even re-order the closing process to
get around the issue I mentioned above if it is required.

Guozhang

On Mon, Apr 11, 2016 at 4:16 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> What about extending the API with a method beforeClose() that enables
> the user to flush buffered data?
>
> Maybe we can also rename close() to afterClose(), to make the difference
> clear. At least, we should document when close() is called -- from a
> user point of view, I would expect that close() allows to flush data
> what is not the case.
>
> -Matthias
>
> On 04/11/2016 02:30 AM, Guozhang Wang wrote:
> > Re 1), Kafka Streams intentionally close all underlying clients before
> > closing processors since some of closing the processors require shutting
> > down its processor state managers, for example we need to make sure
> > producer's message sends // have all been acked before the state manager
> > records // changelog sent offsets. To complement it we trigger
> commitAll()
> > before closing the clients.
> >
> >
> > Guozhang
> >
> > On Sun, Apr 10, 2016 at 9:17 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> >> Also, I wonder if this issue is related:
> >> https://issues.apache.org/jira/browse/KAFKA-3135
> >>
> >> -Jay
> >>
> >> On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps <j...@confluent.io> wrote:
> >>
> >>> Two things:
> >>> 1. Caching data in the processor is a bit dangerous since it will be
> lost
> >>> on failure. Nonetheless, I think you have a point that we should
> ideally
> >>> close the processors first, then commit in case they send any messages
> on
> >>> close.
> >>> 2. The issue you describe shouldn't happen for the reason you describe.
> >>> Both the broker and the consumer handle batches of messages so
> fetching a
> >>> single 1 MB message versus 1024 1KB messages should be the same. The
> >>> proposed max.poll.messages would just effect how many records are
> handed
> >>> out they will have been fetched and be in memory in the consumer no
> >> matter
> >>> what. I wonder if you could help us trace down what's happening for
> >>> you--maybe provide a simple test case that reproduces the problem?
> >>>
> >>>
> >>> On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon <
> >>> mdco...@yahoo.com.invalid> wrote:
> >>>
> >>>> Guozhang,
> >>>>    Yes, I'm merging message contents into larger messages before
> sending
> >>>> to the producer. We have demonstrated that many tiny messages of < 1K
> >>>> causes tremendous slow down on the down stream consumers. Not because
> of
> >>>> memory contention but because of the broker filling up the max fetch
> >>>> request size by adding hundreds of thousands of tiny messages to the
> >> fetch
> >>>> response. The consumer then has to deal with those messages and it
> >> causes
> >>>> huge latency problems….the broker has to add those hundreds of
> >> thousands of
> >>>> messages to the response. It takes > 5 seconds per fetch to return
> from
> >> the
> >>>> broker in most cases. In contrast, when I merge messages into bundled
> >>>> single-messages with larger payloads, we get excellent throughput
> >> because
> >>>> there is less polling and the number of messages is reduced.
> >>>>    I'm locked into a battle between fetch size constraints and max
> >>>> message size constraints…my max message size can actually spike over
> 5MB
> >>>> for a single message (non-merged) but most of the time it's < 1K.
> That's
> >>>> just the kind of data set we're dealing with. So I can't set fetch
> size
> >> too
> >>>> low or one of these larger messages will come in and break the
> consumer
> >>>> from being able to process anything.
> >>>>    So we either need a way to tell the broker not to fill the max
> fetch
> >>>> size before returning (max.poll.messages) or I need a way to flush to
> >> the
> >>>> producer when it's about to close my producer. The latter offers the
> >>>> benefit of flushing data that may be the results of processing input
> >> data
> >>>> whose offsets were already committed asynchronously.
> >>>> Mike
> >>>>
> >>>>     On Saturday, April 9, 2016 2:27 PM, Guozhang Wang <
> >> wangg...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>
> >>>>  Mike,
> >>>>
> >>>> Not clear what do you mean by "buffering up the contents". Producer
> >> itself
> >>>> already did some buffering and batching when sending to Kafka. Did you
> >>>> actually "merge" multiple small messages into one large message before
> >>>> giving it to the producer in the app code? In either case, I am not
> sure
> >>>> how it will help the downstream consumer memory pressure issue?
> >>>>
> >>>> About bounding the consumer memory usage, we already have some
> thoughts
> >>>> about that issue and plan to add the memory bounding feature like the
> >>>> producer does in the near future (
> >>>> https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a
> >>>> problem
> >>>> for long. And for the "max.poll.messages" config and 0.10.0, just FYI
> we
> >>>> are shooting to have it released end of this month.
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon
> >> <mdco...@yahoo.com.invalid
> >>>>>
> >>>> wrote:
> >>>>
> >>>>> Guozhang,
> >>>>>    In my processor, I'm buffering up contents of the final messages
> in
> >>>>> order to make them larger. This is to optimize throughput and avoid
> >> tiny
> >>>>> messages from being injected downstream. So nothing is being pushed
> to
> >>>> the
> >>>>> producer until my configured thresholds are met in the buffering
> >>>> mechanism.
> >>>>> So as it stands, these messages are left dangling after the producer
> >>>> closes
> >>>>> and, even worse, if periodic commits are happening behind the scenes,
> >>>> the
> >>>>> data is lost on restart.
> >>>>>    What we need is a way to notify the processors that everything is
> >>>>> "about" to close so that I can properly flush what I have in memory
> >> out
> >>>> to
> >>>>> the producer. Otherwise, I'm stuck with always sending tiny messages
> >>>> into
> >>>>> kafka--which I know for certain causes problems on down stream
> >> consumers
> >>>>> (where they set a high fetch memory size and it causes hundreds of
> >>>>> thousands of messages to be retrieved at a time…and thus bogs down
> the
> >>>>> consumer). I think the "max.poll.messages" setting we discussed
> before
> >>>>> would help here but if it's not available until 0.10, I'm kind of
> >> stuck.
> >>>>>    Another option might be to disable periodic commits and only
> commit
> >>>>> when the processor requests it. This would mitigate some data loss
> and
> >>>> is
> >>>>> better than nothing. There is still a chance that data in RecordQueue
> >>>> not
> >>>>> yet sent to my processor would be committed but never processed in
> >> this
> >>>>> case.
> >>>>>    Another thought I had was to reduce the max fetch size; however,
> >> some
> >>>>> messages can be very large (i.e. data spikes periodically). In this
> >>>> case,
> >>>>> the messages size would exceed my lower max fetch size causing the
> >>>> consumer
> >>>>> to simply stop consuming. So I'm stuck. So either we need to roll in
> >> the
> >>>>> max.poll.messages sooner than 0.10 or maybe a callback mechanism
> >>>> letting me
> >>>>> know that the producer is about to close so I can clear my buffers.
> >>>>>    Ideas?
> >>>>> Mike
> >>>>>
> >>>>>    On Friday, April 8, 2016 8:24 PM, Guozhang Wang <
> >> wangg...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>
> >>>>>  Hi Michael,
> >>>>>
> >>>>> When you call KafkaStreams.close(), it will first trigger a
> >> commitAll()
> >>>>> function, which will 1) flush local state store if necessary; 2)
> flush
> >>>>> messages buffered in producer; 3) commit offsets on consumer. Then it
> >>>> will
> >>>>> close the producer / consumer clients and shutdown the tasks. So when
> >>>> you
> >>>>> see processor's "close" function triggered, any buffered messages in
> >> the
> >>>>> producer should already been flushed.
> >>>>>
> >>>>> Did you see a different behavior than the above described?
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon
> >>>> <mdco...@yahoo.com.invalid
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> All,
> >>>>>>    I'm seeing my processor's "close" method being called AFTER my
> >>>>>> downstream producer has been closed. I had assumed that on close I
> >>>> would
> >>>>> be
> >>>>>> able to flush whatever I had been buffering up to send to kafka
> >>>> topic. In
> >>>>>> other words, we've seen significant performance differences in
> >>>> building
> >>>>>> flows with small messages and large messages in/out of kafka. So my
> >>>>>> processor buffers up messages to a threshold and flushes those as a
> >>>>>> composite message bundle to improve downstream processing. But if
> >> this
> >>>>>> close method is called AFTER the producer has already been closed, I
> >>>>> would
> >>>>>> have no way to actually flush the final composite bundles to my
> >> topic
> >>>> on
> >>>>>> shutdown. Is there some way to get a call BEFORE producer shutdown
> >>>>> occurs?
> >>>>>> Mike
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >
> >
> >
>
>


-- 
-- Guozhang

Reply via email to