[
https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923821#comment-16923821
]
Sean McNealy commented on SAMZA-2308:
-------------------------------------
Our workaround is to avoid using the multithreading scheduling for our jobs
that have a priority stream and a batch stream. This is fine for us, we've
considered the thread pool a kind of experimental feature since it came out,
though it is really fast and cool. We designed for and can use the original
Samza multi-container model if we want faster processing, and run a single
thread in a container.
We only noticed the problem because we have a streaming and batch mode to our
task. The streaming is high priority and can easily keep up with the input
stream rate since it hits a cache. The batch processing is low priority because
it takes much longer as it misses the cache. We set up priority on the streams,
but noticed that when adding threads we started lagging behind on the priority
live streams. I say this just to show that the Bootstrap streams would help
remove any lag every startup, but would return us to the wrong round robin
priority after the live streams had no more lag.
It seems like a really hard problem to fix. The old container based model used
to inherently know which tasks/partitions were available to choose from since
anything else would be scheduled by someone else and the containers don't share
the load of several hot-spot partitions like is possible using a thread pool.
So a single Chooser object per container or thread works, but there are trade
offs to tying a Task to a thread, especially when less threads than partitions
are configured. And a Chooser object that can dynamically respond with a
message from a subset of partitions might be more fragile and computationally
intensive than desired.
I agree with you that "consumerMultiplexer.choose(true)" looks dangerous in the
async mode. It looks to me that the false branch is there so that we don't
flood the pendingEnvelopeQueue (possibly without bound when one partition is
done but others have many messages remaining as it keeps checking for any
message from that one partition). I don't plan on trying that myself.
It's been fun discussing the problem at work. We're not sure of any easy way to
make these features work together.
> AsyncRunLoop and TieredPriorityChooser fail to be used together
> ---------------------------------------------------------------
>
> Key: SAMZA-2308
> URL: https://issues.apache.org/jira/browse/SAMZA-2308
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.14.0, 0.13.1, 1.0, 1.1, 1.2
> Reporter: Sean McNealy
> Assignee: Dengpan Yin
> Priority: Major
> Attachments: PriorityChooserTest.scala
>
>
> In the single threaded run loop, a message envelop was replaced using the
> "tryUpdate(ssp)" function each time before calling ".choose()".
> In the AsyncRunLoop that replacement is delayed until the worker is ready and
> the envelope begins processing. This allows choosing more messages to
> schedule to tasks which keeps threads busy and allows for scheduling lower
> priority messages when partitions are available. Good things when threads are
> available. When a message is chosen for an already processing partition it is
> added to the task's pendingEnvelopeQueue so that the run loop can choose yet
> more messages. But the TieredPriorityChooser may respond with a lower
> priority message for a SystemStreamPartition that is lower than another
> nonempty SystemStreamPartition since there is no way to exclude a partition
> or priority level from the ".choose()" operation.
> In fact, the Chooser object can be frequently exhausted of all messages. What
> happens is that for a partition, one messages from every SystemStream that is
> not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't
> respect the set priority settings, so we just devolve to a round robin policy.
> To reproduce, run a job with the following setting:
> task.inputs=kafka.high-priority,kafka.low-priority
> job.container.thread.pool.size=2
> systems.kafka.streams.high-priority.samza.priority=1
> Expected behavior:
> Each partition will fully read the high-priority stream before reading
> messages from the low-priority stream.
> Observed behavior:
> Each partition reads from both streams as in a round robin policy.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)