[ 
https://issues.apache.org/jira/browse/SAMZA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-2:
--------------------------------

    Attachment: SAMZA-2.0.patch

Attaching patch.

1. Added PriorityChooser, which is an abstract class that uses a PriorityQueue 
to sort envelopes. Must implement the prioritize method when extending this 
class.
2. Added MessageChooserFactory.
3. Added "task.chooser.class" config in TaskConfig, which defines a 
MessageChooserFactory.
4. Renamed DefaultChooser to RoundRobinChooser.
5. Switched RoundRobingChooser to extend PriorityChooser.
6. Added StreamChooser, which prioritizes envelope according to their 
system/stream.
7. Added tests for StreamChooser, RoundRobinChooser, and PriorityChooser.

All tests are passing.

Items for discussion:

1. I put PriorityQueue in samza-api, so developers could implement 
priority-based choosers in Java. I put RoundRobinChooser/StreamChooser in 
samza-core.
2. I added a MessageChooserFactory. You could just have 
MessageChooser.init(config), but this isn't the pattern we've been following 
for anything else (except CheckpointManager).
3. StreamChooser picks de-prioritizes any envelope to -1 if it's not in the 
priority list. You could argue an exception should be thrown. You could also 
argue falling back to round robin for de-prioritized envelope, rather than -1, 
since ordering of equal priority items is undefined in PriorityQueue.
4. I defined higher priority as a larger number. This seems more intuitive to 
me, but it's the reverse of PriorityQueue's behavior: "The head of this queue 
is the least element with respect to the specified ordering."

Review Board: https://reviews.apache.org/r/13725/
                
> Fine-grain control over stream consumption
> ------------------------------------------
>
>                 Key: SAMZA-2
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-2.0.patch
>
>
> Currently, samza exposes configuration in the form of 
> "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes the 
> Task will read from a stream. This is a feature request for programmatic 
> fine-grain control over stream consumption. The use-case is a samza task that 
> will be consuming multiple streams where some streams may be from live 
> systems that have stricter SLA requirements and must always be prioritized 
> over other streams that may be from batch systems. The above configuration is 
> not the ideal way to express this type of stream prioritization because 
> configuring the "batch" streams with a low consumption rate will decrease the 
> overall throughput of the system when there is no data in the "live" streams. 
> Furthermore, we'll want to throttle each "batch" stream based on external 
> signals that can change over time. Because of the dynamic nature of these 
> external signals, we would like to have a programmatic interface that can 
> dynamically change the prioritization as the signal changes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to