[ 
https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean McNealy updated SAMZA-2308:
--------------------------------
    Description: 
In the single threaded run loop, a message envelop was replaced using the 
"tryUpdate(ssp)" function each time before calling ".choose()".

In the AsyncRunLoop that replacement is delayed until the worker is ready and 
the envelope begins processing. This allows choosing more messages to schedule 
to tasks which keeps threads busy and allows for scheduling lower priority 
messages when partitions are available. Good things when threads are available. 
When a message is chosen for an already processing partition it is added to the 
task's pendingEnvelopeQueue so that the run loop can choose yet more messages. 
But the TieredPriorityChooser may respond with a lower priority message for a 
SystemStreamPartition that is lower than another nonempty SystemStreamPartition 
since there is no way to exclude a partition or priority level from the 
".choose()" operation.

In fact, the Chooser object can be frequently exhausted of all messages. What 
happens is that for a partition, one messages from every SystemStream that is 
not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't 
respect the set priority settings, so we just devolve to a round robin policy.

To reproduce, run a job with the following setting:
task.inputs=kafka.high-priority,kafka.low-priority
job.container.thread.pool.size=2
systems.kafka.streams.high-priority.samza.priority=1

Expected behavior:
Each partition will fully read the high-priority stream before reading messages 
from the low-priority stream.

Observed behavior:
Each partition reads from both streams as in a round robin policy.

  was:
In the single threaded run loop, a message envelop was replaced using the 
"tryUpdate(ssp)" function each time before calling ".choose()".

In the AsyncRunLoop that replacement is delayed until the callback completes. 
This allows choosing more messages to schedule to tasks which keeps threads 
busy and allows for scheduling lower priority messages when partitions are 
available. Good things when threads are available. When a message is chosen for 
an already processing partition it is added to the task's pendingEnvelopeQueue 
so that the run loop can choose yet more messages. But the 
TieredPriorityChooser may respond with a lower priority message for a 
SystemStreamPartition that is lower than another nonempty SystemStreamPartition 
since there is no way to exclude a partition or priority level from the 
".choose()" operation.

In fact, the Chooser object can be frequently exhausted of all messages. What 
happens is that for a partition, one messages from every SystemStream that is 
not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't 
respect the set priority settings, so we just devolve to a round robin policy.

To reproduce, run a job with the following setting:
task.inputs=kafka.high-priority,kafka.low-priority
job.container.thread.pool.size=2
systems.kafka.streams.high-priority.samza.priority=1

Expected behavior:
Each partition will fully read the high-priority stream before reading messages 
from the low-priority stream.

Observed behavior:
Each partition reads from both streams as in a round robin policy.


> AsyncRunLoop and TieredPriorityChooser fail to be used together
> ---------------------------------------------------------------
>
>                 Key: SAMZA-2308
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2308
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.14.0, 0.13.1
>            Reporter: Sean McNealy
>            Priority: Major
>         Attachments: PriorityChooserTest.scala
>
>
> In the single threaded run loop, a message envelop was replaced using the 
> "tryUpdate(ssp)" function each time before calling ".choose()".
> In the AsyncRunLoop that replacement is delayed until the worker is ready and 
> the envelope begins processing. This allows choosing more messages to 
> schedule to tasks which keeps threads busy and allows for scheduling lower 
> priority messages when partitions are available. Good things when threads are 
> available. When a message is chosen for an already processing partition it is 
> added to the task's pendingEnvelopeQueue so that the run loop can choose yet 
> more messages. But the TieredPriorityChooser may respond with a lower 
> priority message for a SystemStreamPartition that is lower than another 
> nonempty SystemStreamPartition since there is no way to exclude a partition 
> or priority level from the ".choose()" operation.
> In fact, the Chooser object can be frequently exhausted of all messages. What 
> happens is that for a partition, one messages from every SystemStream that is 
> not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't 
> respect the set priority settings, so we just devolve to a round robin policy.
> To reproduce, run a job with the following setting:
> task.inputs=kafka.high-priority,kafka.low-priority
> job.container.thread.pool.size=2
> systems.kafka.streams.high-priority.samza.priority=1
> Expected behavior:
> Each partition will fully read the high-priority stream before reading 
> messages from the low-priority stream.
> Observed behavior:
> Each partition reads from both streams as in a round robin policy.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to