[
https://issues.apache.org/jira/browse/SAMZA-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14069378#comment-14069378
]
TJ Giuli commented on SAMZA-342:
--------------------------------
Hi, Chris, ok, I've run several experiments and unfortunately don't have much
of a resolution. Your explanation makes a lot of sense, but when I recompiled
and ran with refreshThreshold set to 20000000 (much larger than the volume of
messages I'm seeing on my bulk topics), I find little difference than when I
set refreshThreshold to 1.
I'm including the Kafka and Samza logs below -- they (I think) show when Kafka
records a message as appearing on the realtime topic and when it's finally
observed in my stream processor. I run the experiment 3 times each for
refreshThreshold = 1 and 20000000.
{noformat}
SystemConsumer refreshThreshold=1:
[2014-07-21 14:23:21,797] TRACE Appended message set to log REALTIME-0 with
first offset: 28, next offset: 30, and messages:
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc =
136883696, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3336
cap=3336]),28), MessageAndOffset(Message(magic = 0, attributes = 0, crc =
2978032049, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647
cap=1647]),29)) (kafka.log.Log)
2014-07-21 14:23:36 TaskInstance [TRACE] Processing incoming message envelope
for partition: Partition [partition=0], SystemStreamPartition
[partition=Partition [partition=0], system=kafka, stream=REALTIME]
**************
[2014-07-21 14:26:53,210] TRACE Appended message set to log REALTIME-0 with
first offset: 30, next offset: 32, and messages:
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc =
4093869568, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3555
cap=3555]),30), MessageAndOffset(Message(magic = 0, attributes = 0, crc =
400086399, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647
cap=1647]),31)) (kafka.log.Log)
2014-07-21 14:27:07 TaskInstance [TRACE] Processing incoming message envelope
for partition: Partition [partition=0], SystemStreamPartition
[partition=Partition [partition=0], system=kafka, stream=REALTIME]
***************
[2014-07-21 14:29:38,453] TRACE Appended message set to log REALTIME-0 with
first offset: 32, next offset: 34, and messages:
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc =
2001288351, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3774
cap=3774]),32), MessageAndOffset(Message(magic = 0, attributes = 0, crc =
492894339, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647
cap=1647]),33)) (kafka.log.Log)
2014-07-21 14:29:53 TaskInstance [TRACE] Processing incoming message envelope
for partition: Partition [partition=0], SystemStreamPartition
[partition=Partition [partition=0], system=kafka, stream=REALTIME]
SystemConsumer: refreshThreshold = 20000000
[2014-07-21 14:34:14,468] TRACE Appended message set to log REALTIME-0 with
first offset: 34, next offset: 36, and messages:
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc =
2045673790, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3993
cap=3993]),34), MessageAndOffset(Message(magic = 0, attributes = 0, crc =
3101088091, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647
cap=1647]),35)) (kafka.log.Log)
2014-07-21 14:34:29 TaskInstance [TRACE] Processing incoming message envelope
for partition: Partition [partition=0], SystemStreamPartition
[partition=Partition [partition=0], system=kafka, stream=REALTIME]
**********************
[2014-07-21 14:39:27,711] TRACE Appended message set to log REALTIME-0 with
first offset: 36, next offset: 38, and messages:
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc =
1761075894, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=4212
cap=4212]),36), MessageAndOffset(Message(magic = 0, attributes = 0, crc =
3315050524, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647
cap=1647]),37)) (kafka.log.Log)
2014-07-21 14:39:42 TaskInstance [TRACE] Processing incoming message envelope
for partition: Partition [partition=0], SystemStreamPartition
[partition=Partition [partition=0], system=kafka, stream=REALTIME]
****************************
[2014-07-21 14:42:40,181] TRACE Appended message set to log REALTIME-0 with
first offset: 38, next offset: 40, and messages:
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc =
2267447584, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=4431
cap=4431]),38), MessageAndOffset(Message(magic = 0, attributes = 0, crc =
3484175016, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647
cap=1647]),39)) (kafka.log.Log)
2014-07-21 14:42:51 TaskInstance [TRACE] Processing incoming message envelope
for partition: Partition [partition=0], SystemStreamPartition
[partition=Partition [partition=0], system=kafka, stream=REALTIME]
{noformat}
Strange!
> Priority streams experience large latencies before being consumed by the
> stream processor
> -----------------------------------------------------------------------------------------
>
> Key: SAMZA-342
> URL: https://issues.apache.org/jira/browse/SAMZA-342
> Project: Samza
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.7.0
> Environment: ubuntu 13.10
> Reporter: TJ Giuli
>
> I have a stream processor that takes inputs from multiple streams, some are
> more batch, non-latency sensitive and others are real-time, infrequently have
> traffic and should be low-latency. The real-time stream helps me interpret
> the batch stream, so I would ideally like any real-time stream envelopes
> delivered within some maximum latency from the time the message enters into a
> Kafka topic.
> I have my stream processor configured to prioritize my real-time streams over
> the batch streams, but I consistently find that the real-time stream is
> delayed by traffic from the batch stream. From tracing the Kafka consumer,
> it looks like my stream processor periodically fetches from Kafka, finds that
> the batch streams have a large chunk of messages waiting, doesn’t find
> anything on the real-time topics, and processes away the batch messages for a
> few minutes. During the batch processing, the Kafka consumer does not poll
> the real-time streams, so if a message is sent to a real-time topic, the
> message effectively doesn’t arrive until the next time the Kafka consumer
> does another fetch. When a real-time message is consumed by the Kafka
> consumer, the TieredPriorityChooser correctly prioritizes traffic from the
> real-time streams over the batch streams.
--
This message was sent by Atlassian JIRA
(v6.2#6252)