Hi, Chris, there may be some latency between the producer and Kafka, but it looks like there is still a significant delay from when Kafka logs the receipt of a new message committed to a real-time topic and when the Samza stream processor receives it (11 seconds in my latest test run). Thanks, —T
On Jul 16, 2014, at 9:40 AM, Chris Riccomini <[email protected]> wrote: > Hey TJ, > > Also, is it possible that your upstream Kafka producer (the one sending > the RT messages) is set to async? In such a case, the messages will be > buffered for some period of time before they're sent to Kafka. > > Cheers, > Chris > > On 7/16/14 9:35 AM, "Chris Riccomini" <[email protected]> wrote: > >> Hey TJ, >> >> To debug this, we're going to have to get our hands dirty with some >> metrics. Sorry. :( >> >> The first step is to establish whether the BrokerProxy code in Samza is >> actively getting data for the realtime topic when you're processing batch >> messages. To do this, please turn on the JMX reporter for your job and >> look at this metric: >> >> %s-%s-messages-read" format (tp.topic, tp.partition)) >> >> >> >> Even if your job is in the middle of processing batch data, this metric >> should increase when messages are sent to the realtime topic. If it >> increases, then the message is received in realtime, but being buffered >> somewhere in the SamzaContainer before being handed off to your >> StreamTask. If the number does not increase, then the BrokerProxy either >> isn't fetching the realtime data, or the realtime data is being buffered >> somewhere upstream (on the producer, or in the broker). >> >> Once we bisect here, we can continue narrowing down where the problem is. >> >> Cheers, >> Chris >> >> On 7/15/14 12:50 PM, "TJ Giuli" <[email protected]> wrote: >> >>> Hey, Chris: >>> 1. Yes, batch size set to 1: systems.kafka.producer.batch.num.messages=1 >>> 2. Building default chooser with: useBatching=false, >>> useBootstrapping=false, usePriority=true >>> 3. I just re-ran my test and the delay is about 16 seconds. When I’m >>> really pumping data through the batch Kafka topics, I’ve seen around 1 >>> minute. >>> >>> Thanks! >>> —T >>> >>> On Jul 15, 2014, at 10:11 AM, Chris Riccomini >>> <[email protected]> wrote: >>> >>>> Hey TJ, >>>> >>>> This sounds like a bug. >>>> >>>> 1. Is the task.consumer.batch.size config set for your job? >>>> 2. What does this log line say: Building default chooser with: >>>> useBatching=%s, useBootstrapping=%s, usePriority=%s >>>> 3. How long is the delay that you're seeing? >>>> >>>> Cheers, >>>> Chris >>>> >>>> On 7/14/14 4:57 PM, "Yan Fang" <[email protected]> wrote: >>>> >>>>> It seems for me that, the batch processing fetches many messages at >>>>> one >>>>> time and then takes too long time to process. My first thought is >>>>> that, >>>>> since we have the systems.system-name.samza.fetch.threshold >>>>> >>>>> <http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/confi >>>>> g >>>>> ur >>>>> ation-table.html> >>>>> , >>>>> setting this number to a smaller (default is 50000) will force the >>>>> system >>>>> to fetch the messages more frequently. >>>>> >>>>> Any other ideas? >>>>> >>>>> Fang, Yan >>>>> [email protected] >>>>> +1 (206) 849-4108 >>>>> >>>>> >>>>> On Mon, Jul 14, 2014 at 1:12 PM, TJ Giuli <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have a stream processor that takes inputs from multiple streams, >>>>>> some >>>>>> are more batch, non-latency sensitive and others are real-time, >>>>>> infrequently have traffic and should be low-latency. The real-time >>>>>> stream >>>>>> helps me interpret the batch stream, so I would ideally like any >>>>>> real-time >>>>>> stream envelopes delivered within some maximum latency from the time >>>>>> the >>>>>> message enters into a Kafka topic. >>>>>> >>>>>> I have my stream processor configured to prioritize my real-time >>>>>> streams >>>>>> over the batch streams, but I consistently find that the real-time >>>>>> stream >>>>>> is delayed by traffic from the batch stream. From tracing the Kafka >>>>>> consumer, it looks like my stream processor periodically fetches from >>>>>> Kafka, finds that the batch streams have a large chunk of messages >>>>>> waiting, >>>>>> doesn¹t find anything on the real-time topics, and processes away the >>>>>> batch >>>>>> messages for a few minutes. During the batch processing, the Kafka >>>>>> consumer does not poll the real-time streams, so if a message is sent >>>>>> to a >>>>>> real-time topic, the message effectively doesn¹t arrive until the >>>>>> next >>>>>> time >>>>>> the Kafka consumer does another fetch. When a real-time message is >>>>>> consumed by the Kafka consumer, the TieredPriorityChooser correctly >>>>>> prioritizes traffic from the real-time streams over the batch >>>>>> streams. >>>>>> >>>>>> Does anyone have recommendations on how to get infrequent but >>>>>> important >>>>>> messages within some maximum time bound? Thanks! >>>>>> ‹T >>>> >>> >> >
