[
https://issues.apache.org/jira/browse/SAMZA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-2:
--------------------------------
Attachment: SAMZA-2.2.patch
Updated patch with feedback from JIRA and design proposal.
Items to watch out for:
1. I added MessageChooser.start, MessageChooser.stop, and
MessageChooser.register back in. The BootstrappingChooser needs the last offset
for each bootstrap stream in order to mark streams as "caught up" in cases
where the last checkpointed offset == the offset of the last message in the
stream. This data is not available at wire-up time -- it's only available after
the TaskInstance restores its offsets, and registers with SystemConsumers. I'm
all ears if you guys have a better/cleaner idea.
2. The behavior of the DefaultChooser is such that a bootstrap stream will not
re-process the entire stream every time the StreamTask starts up. It will only
process from the last checkpointed offset forward. I think this is actually
desirable, since the use case for a bootstrap stream is usually to bootstrap
the stream into a key/value store. The key/value store gets restored, itself,
on startup, so there's no point in re-processing the whole bootstrap stream;
you can simply pick up where you left off.
3. DefaultChooserFactory is the default factory. Due to tie-breaking of
same-priority envelopes, and for batching without priority streams, the
DefaultChooser actually takes a MessageChooser, itself. This is kind of weird,
because we end up with two configs: task.chooser.class, and
task.chooser.wrapped.class. The first one is the actual pluggable config that
the SamzaContainer uses via SystemConsumers. The second one is what
DefaultChooser uses to break priority ties, or in cases where its just going to
batch.
4. It's unclear to me if there's a better 'idiomatic Scala' approach to the
class composition of the choosers. Mixins, traits, etc. Open to advice.
5. Batching is disabled by default.
6. This code is quite complicated. I tried to make it as simple as possible,
but we're just trying to do a lot. There's probably an argument to be made in
having three simple choosers, instead of one complex one that does
bootstrapping, batching, and prioritizing.
Remaining TODOs:
1. Add logging.
2. Write KafkaSystemAdmin.getLastOffsets
3. Update documentation.
4. Add a bootstrapping task to the integration test framework.
All tests pass.
> Fine-grain control over stream consumption
> ------------------------------------------
>
> Key: SAMZA-2
> URL: https://issues.apache.org/jira/browse/SAMZA-2
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
> Attachments: SAMZA-2.0.patch, SAMZA-2.1.patch, SAMZA-2.2.patch
>
>
> Currently, samza exposes configuration in the form of
> "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes the
> Task will read from a stream. This is a feature request for programmatic
> fine-grain control over stream consumption. The use-case is a samza task that
> will be consuming multiple streams where some streams may be from live
> systems that have stricter SLA requirements and must always be prioritized
> over other streams that may be from batch systems. The above configuration is
> not the ideal way to express this type of stream prioritization because
> configuring the "batch" streams with a low consumption rate will decrease the
> overall throughput of the system when there is no data in the "live" streams.
> Furthermore, we'll want to throttle each "batch" stream based on external
> signals that can change over time. Because of the dynamic nature of these
> external signals, we would like to have a programmatic interface that can
> dynamically change the prioritization as the signal changes.
> Design proposal:
> https://wiki.apache.org/samza/Pluggable%20MessageChooser
> Review board:
> https://reviews.apache.org/r/13725/
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira