Yeah we can definitely do better in documentation. While regarding the API changes I would prefer to hold and think through if such use cases are common in pattern, and that if we can even re-order the closing process to get around the issue I mentioned above if it is required.
Guozhang On Mon, Apr 11, 2016 at 4:16 AM, Matthias J. Sax <matth...@confluent.io> wrote: > What about extending the API with a method beforeClose() that enables > the user to flush buffered data? > > Maybe we can also rename close() to afterClose(), to make the difference > clear. At least, we should document when close() is called -- from a > user point of view, I would expect that close() allows to flush data > what is not the case. > > -Matthias > > On 04/11/2016 02:30 AM, Guozhang Wang wrote: > > Re 1), Kafka Streams intentionally close all underlying clients before > > closing processors since some of closing the processors require shutting > > down its processor state managers, for example we need to make sure > > producer's message sends // have all been acked before the state manager > > records // changelog sent offsets. To complement it we trigger > commitAll() > > before closing the clients. > > > > > > Guozhang > > > > On Sun, Apr 10, 2016 at 9:17 AM, Jay Kreps <j...@confluent.io> wrote: > > > >> Also, I wonder if this issue is related: > >> https://issues.apache.org/jira/browse/KAFKA-3135 > >> > >> -Jay > >> > >> On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps <j...@confluent.io> wrote: > >> > >>> Two things: > >>> 1. Caching data in the processor is a bit dangerous since it will be > lost > >>> on failure. Nonetheless, I think you have a point that we should > ideally > >>> close the processors first, then commit in case they send any messages > on > >>> close. > >>> 2. The issue you describe shouldn't happen for the reason you describe. > >>> Both the broker and the consumer handle batches of messages so > fetching a > >>> single 1 MB message versus 1024 1KB messages should be the same. The > >>> proposed max.poll.messages would just effect how many records are > handed > >>> out they will have been fetched and be in memory in the consumer no > >> matter > >>> what. I wonder if you could help us trace down what's happening for > >>> you--maybe provide a simple test case that reproduces the problem? > >>> > >>> > >>> On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon < > >>> mdco...@yahoo.com.invalid> wrote: > >>> > >>>> Guozhang, > >>>> Yes, I'm merging message contents into larger messages before > sending > >>>> to the producer. We have demonstrated that many tiny messages of < 1K > >>>> causes tremendous slow down on the down stream consumers. Not because > of > >>>> memory contention but because of the broker filling up the max fetch > >>>> request size by adding hundreds of thousands of tiny messages to the > >> fetch > >>>> response. The consumer then has to deal with those messages and it > >> causes > >>>> huge latency problems….the broker has to add those hundreds of > >> thousands of > >>>> messages to the response. It takes > 5 seconds per fetch to return > from > >> the > >>>> broker in most cases. In contrast, when I merge messages into bundled > >>>> single-messages with larger payloads, we get excellent throughput > >> because > >>>> there is less polling and the number of messages is reduced. > >>>> I'm locked into a battle between fetch size constraints and max > >>>> message size constraints…my max message size can actually spike over > 5MB > >>>> for a single message (non-merged) but most of the time it's < 1K. > That's > >>>> just the kind of data set we're dealing with. So I can't set fetch > size > >> too > >>>> low or one of these larger messages will come in and break the > consumer > >>>> from being able to process anything. > >>>> So we either need a way to tell the broker not to fill the max > fetch > >>>> size before returning (max.poll.messages) or I need a way to flush to > >> the > >>>> producer when it's about to close my producer. The latter offers the > >>>> benefit of flushing data that may be the results of processing input > >> data > >>>> whose offsets were already committed asynchronously. > >>>> Mike > >>>> > >>>> On Saturday, April 9, 2016 2:27 PM, Guozhang Wang < > >> wangg...@gmail.com> > >>>> wrote: > >>>> > >>>> > >>>> Mike, > >>>> > >>>> Not clear what do you mean by "buffering up the contents". Producer > >> itself > >>>> already did some buffering and batching when sending to Kafka. Did you > >>>> actually "merge" multiple small messages into one large message before > >>>> giving it to the producer in the app code? In either case, I am not > sure > >>>> how it will help the downstream consumer memory pressure issue? > >>>> > >>>> About bounding the consumer memory usage, we already have some > thoughts > >>>> about that issue and plan to add the memory bounding feature like the > >>>> producer does in the near future ( > >>>> https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a > >>>> problem > >>>> for long. And for the "max.poll.messages" config and 0.10.0, just FYI > we > >>>> are shooting to have it released end of this month. > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon > >> <mdco...@yahoo.com.invalid > >>>>> > >>>> wrote: > >>>> > >>>>> Guozhang, > >>>>> In my processor, I'm buffering up contents of the final messages > in > >>>>> order to make them larger. This is to optimize throughput and avoid > >> tiny > >>>>> messages from being injected downstream. So nothing is being pushed > to > >>>> the > >>>>> producer until my configured thresholds are met in the buffering > >>>> mechanism. > >>>>> So as it stands, these messages are left dangling after the producer > >>>> closes > >>>>> and, even worse, if periodic commits are happening behind the scenes, > >>>> the > >>>>> data is lost on restart. > >>>>> What we need is a way to notify the processors that everything is > >>>>> "about" to close so that I can properly flush what I have in memory > >> out > >>>> to > >>>>> the producer. Otherwise, I'm stuck with always sending tiny messages > >>>> into > >>>>> kafka--which I know for certain causes problems on down stream > >> consumers > >>>>> (where they set a high fetch memory size and it causes hundreds of > >>>>> thousands of messages to be retrieved at a time…and thus bogs down > the > >>>>> consumer). I think the "max.poll.messages" setting we discussed > before > >>>>> would help here but if it's not available until 0.10, I'm kind of > >> stuck. > >>>>> Another option might be to disable periodic commits and only > commit > >>>>> when the processor requests it. This would mitigate some data loss > and > >>>> is > >>>>> better than nothing. There is still a chance that data in RecordQueue > >>>> not > >>>>> yet sent to my processor would be committed but never processed in > >> this > >>>>> case. > >>>>> Another thought I had was to reduce the max fetch size; however, > >> some > >>>>> messages can be very large (i.e. data spikes periodically). In this > >>>> case, > >>>>> the messages size would exceed my lower max fetch size causing the > >>>> consumer > >>>>> to simply stop consuming. So I'm stuck. So either we need to roll in > >> the > >>>>> max.poll.messages sooner than 0.10 or maybe a callback mechanism > >>>> letting me > >>>>> know that the producer is about to close so I can clear my buffers. > >>>>> Ideas? > >>>>> Mike > >>>>> > >>>>> On Friday, April 8, 2016 8:24 PM, Guozhang Wang < > >> wangg...@gmail.com> > >>>>> wrote: > >>>>> > >>>>> > >>>>> Hi Michael, > >>>>> > >>>>> When you call KafkaStreams.close(), it will first trigger a > >> commitAll() > >>>>> function, which will 1) flush local state store if necessary; 2) > flush > >>>>> messages buffered in producer; 3) commit offsets on consumer. Then it > >>>> will > >>>>> close the producer / consumer clients and shutdown the tasks. So when > >>>> you > >>>>> see processor's "close" function triggered, any buffered messages in > >> the > >>>>> producer should already been flushed. > >>>>> > >>>>> Did you see a different behavior than the above described? > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon > >>>> <mdco...@yahoo.com.invalid > >>>>>> > >>>>> wrote: > >>>>> > >>>>>> All, > >>>>>> I'm seeing my processor's "close" method being called AFTER my > >>>>>> downstream producer has been closed. I had assumed that on close I > >>>> would > >>>>> be > >>>>>> able to flush whatever I had been buffering up to send to kafka > >>>> topic. In > >>>>>> other words, we've seen significant performance differences in > >>>> building > >>>>>> flows with small messages and large messages in/out of kafka. So my > >>>>>> processor buffers up messages to a threshold and flushes those as a > >>>>>> composite message bundle to improve downstream processing. But if > >> this > >>>>>> close method is called AFTER the producer has already been closed, I > >>>>> would > >>>>>> have no way to actually flush the final composite bundles to my > >> topic > >>>> on > >>>>>> shutdown. Is there some way to get a call BEFORE producer shutdown > >>>>> occurs? > >>>>>> Mike > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>>> > >>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>>> > >>>> > >>> > >>> > >> > > > > > > > > -- -- Guozhang