Mike,

Not clear what do you mean by "buffering up the contents". Producer itself
already did some buffering and batching when sending to Kafka. Did you
actually "merge" multiple small messages into one large message before
giving it to the producer in the app code? In either case, I am not sure
how it will help the downstream consumer memory pressure issue?

About bounding the consumer memory usage, we already have some thoughts
about that issue and plan to add the memory bounding feature like the
producer does in the near future (
https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a problem
for long. And for the "max.poll.messages" config and 0.10.0, just FYI we
are shooting to have it released end of this month.

Guozhang


On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon <mdco...@yahoo.com.invalid>
wrote:

> Guozhang,
>    In my processor, I'm buffering up contents of the final messages in
> order to make them larger. This is to optimize throughput and avoid tiny
> messages from being injected downstream. So nothing is being pushed to the
> producer until my configured thresholds are met in the buffering mechanism.
> So as it stands, these messages are left dangling after the producer closes
> and, even worse, if periodic commits are happening behind the scenes, the
> data is lost on restart.
>    What we need is a way to notify the processors that everything is
> "about" to close so that I can properly flush what I have in memory out to
> the producer. Otherwise, I'm stuck with always sending tiny messages into
> kafka--which I know for certain causes problems on down stream consumers
> (where they set a high fetch memory size and it causes hundreds of
> thousands of messages to be retrieved at a timeā€¦and thus bogs down the
> consumer). I think the "max.poll.messages" setting we discussed before
> would help here but if it's not available until 0.10, I'm kind of stuck.
>     Another option might be to disable periodic commits and only commit
> when the processor requests it. This would mitigate some data loss and is
> better than nothing. There is still a chance that data in RecordQueue not
> yet sent to my processor would be committed but never processed in this
> case.
>     Another thought I had was to reduce the max fetch size; however, some
> messages can be very large (i.e. data spikes periodically). In this case,
> the messages size would exceed my lower max fetch size causing the consumer
> to simply stop consuming. So I'm stuck. So either we need to roll in the
> max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me
> know that the producer is about to close so I can clear my buffers.
>     Ideas?
> Mike
>
>     On Friday, April 8, 2016 8:24 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
>
>  Hi Michael,
>
> When you call KafkaStreams.close(), it will first trigger a commitAll()
> function, which will 1) flush local state store if necessary; 2) flush
> messages buffered in producer; 3) commit offsets on consumer. Then it will
> close the producer / consumer clients and shutdown the tasks. So when you
> see processor's "close" function triggered, any buffered messages in the
> producer should already been flushed.
>
> Did you see a different behavior than the above described?
>
> Guozhang
>
>
> On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon <mdco...@yahoo.com.invalid
> >
> wrote:
>
> > All,
> >    I'm seeing my processor's "close" method being called AFTER my
> > downstream producer has been closed. I had assumed that on close I would
> be
> > able to flush whatever I had been buffering up to send to kafka topic. In
> > other words, we've seen significant performance differences in building
> > flows with small messages and large messages in/out of kafka. So my
> > processor buffers up messages to a threshold and flushes those as a
> > composite message bundle to improve downstream processing. But if this
> > close method is called AFTER the producer has already been closed, I
> would
> > have no way to actually flush the final composite bundles to my topic on
> > shutdown. Is there some way to get a call BEFORE producer shutdown
> occurs?
> > Mike
> >
> >
>
>
> --
> -- Guozhang
>
>
>
>



-- 
-- Guozhang

Reply via email to