[
https://issues.apache.org/jira/browse/SAMZA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-2:
--------------------------------
Attachment: SAMZA-2.0.patch
Attaching patch.
1. Added PriorityChooser, which is an abstract class that uses a PriorityQueue
to sort envelopes. Must implement the prioritize method when extending this
class.
2. Added MessageChooserFactory.
3. Added "task.chooser.class" config in TaskConfig, which defines a
MessageChooserFactory.
4. Renamed DefaultChooser to RoundRobinChooser.
5. Switched RoundRobingChooser to extend PriorityChooser.
6. Added StreamChooser, which prioritizes envelope according to their
system/stream.
7. Added tests for StreamChooser, RoundRobinChooser, and PriorityChooser.
All tests are passing.
Items for discussion:
1. I put PriorityQueue in samza-api, so developers could implement
priority-based choosers in Java. I put RoundRobinChooser/StreamChooser in
samza-core.
2. I added a MessageChooserFactory. You could just have
MessageChooser.init(config), but this isn't the pattern we've been following
for anything else (except CheckpointManager).
3. StreamChooser picks de-prioritizes any envelope to -1 if it's not in the
priority list. You could argue an exception should be thrown. You could also
argue falling back to round robin for de-prioritized envelope, rather than -1,
since ordering of equal priority items is undefined in PriorityQueue.
4. I defined higher priority as a larger number. This seems more intuitive to
me, but it's the reverse of PriorityQueue's behavior: "The head of this queue
is the least element with respect to the specified ordering."
Review Board: https://reviews.apache.org/r/13725/
> Fine-grain control over stream consumption
> ------------------------------------------
>
> Key: SAMZA-2
> URL: https://issues.apache.org/jira/browse/SAMZA-2
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
> Attachments: SAMZA-2.0.patch
>
>
> Currently, samza exposes configuration in the form of
> "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes the
> Task will read from a stream. This is a feature request for programmatic
> fine-grain control over stream consumption. The use-case is a samza task that
> will be consuming multiple streams where some streams may be from live
> systems that have stricter SLA requirements and must always be prioritized
> over other streams that may be from batch systems. The above configuration is
> not the ideal way to express this type of stream prioritization because
> configuring the "batch" streams with a low consumption rate will decrease the
> overall throughput of the system when there is no data in the "live" streams.
> Furthermore, we'll want to throttle each "batch" stream based on external
> signals that can change over time. Because of the dynamic nature of these
> external signals, we would like to have a programmatic interface that can
> dynamically change the prioritization as the signal changes.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira