[
https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dengpan Yin updated SAMZA-2308:
-------------------------------
Affects Version/s: 1.2
1.1
1.0
> AsyncRunLoop and TieredPriorityChooser fail to be used together
> ---------------------------------------------------------------
>
> Key: SAMZA-2308
> URL: https://issues.apache.org/jira/browse/SAMZA-2308
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.14.0, 0.13.1, 1.0, 1.1, 1.2
> Reporter: Sean McNealy
> Assignee: Dengpan Yin
> Priority: Major
> Attachments: PriorityChooserTest.scala
>
>
> In the single threaded run loop, a message envelop was replaced using the
> "tryUpdate(ssp)" function each time before calling ".choose()".
> In the AsyncRunLoop that replacement is delayed until the worker is ready and
> the envelope begins processing. This allows choosing more messages to
> schedule to tasks which keeps threads busy and allows for scheduling lower
> priority messages when partitions are available. Good things when threads are
> available. When a message is chosen for an already processing partition it is
> added to the task's pendingEnvelopeQueue so that the run loop can choose yet
> more messages. But the TieredPriorityChooser may respond with a lower
> priority message for a SystemStreamPartition that is lower than another
> nonempty SystemStreamPartition since there is no way to exclude a partition
> or priority level from the ".choose()" operation.
> In fact, the Chooser object can be frequently exhausted of all messages. What
> happens is that for a partition, one messages from every SystemStream that is
> not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't
> respect the set priority settings, so we just devolve to a round robin policy.
> To reproduce, run a job with the following setting:
> task.inputs=kafka.high-priority,kafka.low-priority
> job.container.thread.pool.size=2
> systems.kafka.streams.high-priority.samza.priority=1
> Expected behavior:
> Each partition will fully read the high-priority stream before reading
> messages from the low-priority stream.
> Observed behavior:
> Each partition reads from both streams as in a round robin policy.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)