Hi, Chris, there may be some latency between the producer and Kafka, but it 
looks like there is still a significant delay from when Kafka logs the receipt 
of a new message committed to a real-time topic and when the Samza stream 
processor receives it (11 seconds in my latest test run).
Thanks,
—T

On Jul 16, 2014, at 9:40 AM, Chris Riccomini <[email protected]> 
wrote:

> Hey TJ,
> 
> Also, is it possible that your upstream Kafka producer (the one sending
> the RT messages) is set to async? In such a case, the messages will be
> buffered for some period of time before they're sent to Kafka.
> 
> Cheers,
> Chris
> 
> On 7/16/14 9:35 AM, "Chris Riccomini" <[email protected]> wrote:
> 
>> Hey TJ,
>> 
>> To debug this, we're going to have to get our hands dirty with some
>> metrics. Sorry. :(
>> 
>> The first step is to establish whether the BrokerProxy code in Samza is
>> actively getting data for the realtime topic when you're processing batch
>> messages. To do this, please turn on the JMX reporter for your job and
>> look at this metric:
>> 
>> %s-%s-messages-read" format (tp.topic, tp.partition))
>> 
>> 
>> 
>> Even if your job is in the middle of processing batch data, this metric
>> should increase when messages are sent to the realtime topic. If it
>> increases, then the message is received in realtime, but being buffered
>> somewhere in the SamzaContainer before being handed off to your
>> StreamTask. If the number does not increase, then the BrokerProxy either
>> isn't fetching the realtime data, or the realtime data is being buffered
>> somewhere upstream (on the producer, or in the broker).
>> 
>> Once we bisect here, we can continue narrowing down where the problem is.
>> 
>> Cheers,
>> Chris
>> 
>> On 7/15/14 12:50 PM, "TJ Giuli" <[email protected]> wrote:
>> 
>>> Hey, Chris:
>>> 1.  Yes, batch size set to 1: systems.kafka.producer.batch.num.messages=1
>>> 2.   Building default chooser with: useBatching=false,
>>> useBootstrapping=false, usePriority=true
>>> 3.  I just re-ran my test and the delay is about 16 seconds.  When I’m
>>> really pumping data through the batch Kafka topics, I’ve seen around 1
>>> minute.
>>> 
>>> Thanks!
>>> —T
>>> 
>>> On Jul 15, 2014, at 10:11 AM, Chris Riccomini
>>> <[email protected]> wrote:
>>> 
>>>> Hey TJ,
>>>> 
>>>> This sounds like a bug.
>>>> 
>>>> 1. Is the task.consumer.batch.size config set for your job?
>>>> 2. What does this log line say: Building default chooser with:
>>>> useBatching=%s, useBootstrapping=%s, usePriority=%s
>>>> 3. How long is the delay that you're seeing?
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 7/14/14 4:57 PM, "Yan Fang" <[email protected]> wrote:
>>>> 
>>>>> It seems for me that, the batch processing fetches many messages at
>>>>> one
>>>>> time and then takes too long time to process. My first thought is
>>>>> that,
>>>>> since we have the systems.system-name.samza.fetch.threshold
>>>>> 
>>>>> <http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/confi
>>>>> g
>>>>> ur
>>>>> ation-table.html>
>>>>> ,
>>>>> setting this number to a smaller (default is 50000) will force the
>>>>> system
>>>>> to fetch the messages more frequently.
>>>>> 
>>>>> Any other ideas?
>>>>> 
>>>>> Fang, Yan
>>>>> [email protected]
>>>>> +1 (206) 849-4108
>>>>> 
>>>>> 
>>>>> On Mon, Jul 14, 2014 at 1:12 PM, TJ Giuli <[email protected]>
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I have a stream processor that takes inputs from multiple streams,
>>>>>> some
>>>>>> are more batch, non-latency sensitive and others are real-time,
>>>>>> infrequently have traffic and should be low-latency.  The real-time
>>>>>> stream
>>>>>> helps me interpret the batch stream, so I would ideally like any
>>>>>> real-time
>>>>>> stream envelopes delivered within some maximum latency from the time
>>>>>> the
>>>>>> message enters into a Kafka topic.
>>>>>> 
>>>>>> I have my stream processor configured to prioritize my real-time
>>>>>> streams
>>>>>> over the batch streams, but I consistently find that the real-time
>>>>>> stream
>>>>>> is delayed by traffic from the batch stream.  From tracing the Kafka
>>>>>> consumer, it looks like my stream processor periodically fetches from
>>>>>> Kafka, finds that the batch streams have a large chunk of messages
>>>>>> waiting,
>>>>>> doesn¹t find anything on the real-time topics, and processes away the
>>>>>> batch
>>>>>> messages for a few minutes.  During the batch processing, the Kafka
>>>>>> consumer does not poll the real-time streams, so if a message is sent
>>>>>> to a
>>>>>> real-time topic, the message effectively doesn¹t arrive until the
>>>>>> next
>>>>>> time
>>>>>> the Kafka consumer does another fetch.  When a real-time message is
>>>>>> consumed by the Kafka consumer, the TieredPriorityChooser correctly
>>>>>> prioritizes traffic from the real-time streams over the batch
>>>>>> streams.
>>>>>> 
>>>>>> Does anyone have recommendations on how to get infrequent but
>>>>>> important
>>>>>> messages within some maximum time bound?  Thanks!
>>>>>> ‹T
>>>> 
>>> 
>> 
> 

Reply via email to