Hey TJ, Also, is it possible that your upstream Kafka producer (the one sending the RT messages) is set to async? In such a case, the messages will be buffered for some period of time before they're sent to Kafka.
Cheers, Chris On 7/16/14 9:35 AM, "Chris Riccomini" <[email protected]> wrote: >Hey TJ, > >To debug this, we're going to have to get our hands dirty with some >metrics. Sorry. :( > >The first step is to establish whether the BrokerProxy code in Samza is >actively getting data for the realtime topic when you're processing batch >messages. To do this, please turn on the JMX reporter for your job and >look at this metric: > > %s-%s-messages-read" format (tp.topic, tp.partition)) > > > >Even if your job is in the middle of processing batch data, this metric >should increase when messages are sent to the realtime topic. If it >increases, then the message is received in realtime, but being buffered >somewhere in the SamzaContainer before being handed off to your >StreamTask. If the number does not increase, then the BrokerProxy either >isn't fetching the realtime data, or the realtime data is being buffered >somewhere upstream (on the producer, or in the broker). > >Once we bisect here, we can continue narrowing down where the problem is. > >Cheers, >Chris > >On 7/15/14 12:50 PM, "TJ Giuli" <[email protected]> wrote: > >>Hey, Chris: >>1. Yes, batch size set to 1: systems.kafka.producer.batch.num.messages=1 >>2. Building default chooser with: useBatching=false, >>useBootstrapping=false, usePriority=true >>3. I just re-ran my test and the delay is about 16 seconds. When I’m >>really pumping data through the batch Kafka topics, I’ve seen around 1 >>minute. >> >>Thanks! >>—T >> >>On Jul 15, 2014, at 10:11 AM, Chris Riccomini >><[email protected]> wrote: >> >>> Hey TJ, >>> >>> This sounds like a bug. >>> >>> 1. Is the task.consumer.batch.size config set for your job? >>> 2. What does this log line say: Building default chooser with: >>> useBatching=%s, useBootstrapping=%s, usePriority=%s >>> 3. How long is the delay that you're seeing? >>> >>> Cheers, >>> Chris >>> >>> On 7/14/14 4:57 PM, "Yan Fang" <[email protected]> wrote: >>> >>>> It seems for me that, the batch processing fetches many messages at >>>>one >>>> time and then takes too long time to process. My first thought is >>>>that, >>>> since we have the systems.system-name.samza.fetch.threshold >>>> >>>><http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/confi >>>>g >>>>ur >>>> ation-table.html> >>>> , >>>> setting this number to a smaller (default is 50000) will force the >>>>system >>>> to fetch the messages more frequently. >>>> >>>> Any other ideas? >>>> >>>> Fang, Yan >>>> [email protected] >>>> +1 (206) 849-4108 >>>> >>>> >>>> On Mon, Jul 14, 2014 at 1:12 PM, TJ Giuli <[email protected]> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have a stream processor that takes inputs from multiple streams, >>>>>some >>>>> are more batch, non-latency sensitive and others are real-time, >>>>> infrequently have traffic and should be low-latency. The real-time >>>>> stream >>>>> helps me interpret the batch stream, so I would ideally like any >>>>> real-time >>>>> stream envelopes delivered within some maximum latency from the time >>>>>the >>>>> message enters into a Kafka topic. >>>>> >>>>> I have my stream processor configured to prioritize my real-time >>>>>streams >>>>> over the batch streams, but I consistently find that the real-time >>>>> stream >>>>> is delayed by traffic from the batch stream. From tracing the Kafka >>>>> consumer, it looks like my stream processor periodically fetches from >>>>> Kafka, finds that the batch streams have a large chunk of messages >>>>> waiting, >>>>> doesn¹t find anything on the real-time topics, and processes away the >>>>> batch >>>>> messages for a few minutes. During the batch processing, the Kafka >>>>> consumer does not poll the real-time streams, so if a message is sent >>>>> to a >>>>> real-time topic, the message effectively doesn¹t arrive until the >>>>>next >>>>> time >>>>> the Kafka consumer does another fetch. When a real-time message is >>>>> consumed by the Kafka consumer, the TieredPriorityChooser correctly >>>>> prioritizes traffic from the real-time streams over the batch >>>>>streams. >>>>> >>>>> Does anyone have recommendations on how to get infrequent but >>>>>important >>>>> messages within some maximum time bound? Thanks! >>>>> ‹T >>> >> >
