[
https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16912783#comment-16912783
]
Sean McNealy commented on SAMZA-2308:
-------------------------------------
Attached a test that runs against 13.1.
> AsyncRunLoop and TieredPriorityChooser fail to be used together
> ---------------------------------------------------------------
>
> Key: SAMZA-2308
> URL: https://issues.apache.org/jira/browse/SAMZA-2308
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.14.0, 0.13.1
> Reporter: Sean McNealy
> Priority: Major
> Attachments: PriorityChooserTest.scala
>
>
> In the single threaded run loop, a message envelop was replaced using the
> "tryUpdate(ssp)" function each time before calling ".choose()".
> In the AsyncRunLoop that replacement is delayed until the callback completes.
> This allows choosing more messages to schedule to tasks which keeps threads
> busy and allows for scheduling lower priority messages when partitions are
> available. Good things when threads are available. When a message is chosen
> for an already processing partition it is added to the task's
> pendingEnvelopeQueue so that the run loop can choose yet more messages. But
> the TieredPriorityChooser may respond with a lower priority message for a
> SystemStreamPartition that is lower than another nonempty
> SystemStreamPartition since there is no way to exclude a partition or
> priority level from the ".choose()" operation.
> In fact, the Chooser object can be frequently exhausted of all messages. What
> happens is that for a partition, one messages from every SystemStream that is
> not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't
> respect the set priority settings, so we just devolve to a round robin policy.
> To reproduce, run a job with the following setting:
> task.inputs=kafka.high-priority,kafka.low-priority
> job.container.thread.pool.size=2
> systems.kafka.streams.high-priority.samza.priority=1
> Expected behavior:
> Each partition will fully read the high-priority stream before reading
> messages from the low-priority stream.
> Observed behavior:
> Each partition reads from both streams as in a round robin policy.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)