Dear Wiki user, You have subscribed to a wiki page or wiki category on "Samza Wiki" for change notification.
The "Pluggable MessageChooser" page has been changed by ChrisRiccomini: https://wiki.apache.org/samza/Pluggable%20MessageChooser?action=diff&rev1=1&rev2=2 - Initial setup. + = Pluggable MessageChooser = + == Introduction == + + When a Samza job is reading messages from more than one system/stream/partition (SSP), a decision needs to be made about the order in which the messages from these SSPs are processed. For example, if a message is available for (stream: AdViewEvent, partition: 7), and so is a message for (stream: AdClickEvent, partition: 7), which message should the SamzaContainer process next? + + It turns out, the order in which messages are processed by SamzaContainer impacts a number of use cases. + + === Use cases === + + ==== Streams with different SLAs ==== + + ==== Bootstrapping state streams ==== + + ==== Time-aligned streams ==== + + === MessageChooser === + + In Samza 0.7.0 we've introduced a class called MessageChooser. This is an object that is given up to one message at a time from each input stream/partition pair (when they're available). The chooser's job is to return the incoming message envelope that should be processed next. + + In plain-spoken English, the Samza says, "I have IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?" It's the MessageChooser's job to answer this question. + + The MessageChooser's interface currently looks like this: + + {{{ + interface MessageChooser { + /** + * Notify the chooser that a new envelope is available for a processing. A + * MessageChooser will receive, at most, one outstanding envelope per + * system/stream/partition combination. For example, if update is called for + * partition 7 of kafka.mystream, then update will not be called with an + * envelope from partition 7 of kafka.mystream until the previous envelope has + * been returned via the choose method. Update will only be invoked after the + * chooser has been started. + * + * @param envelope + * An unprocessed envelope. + */ + void update(IncomingMessageEnvelope envelope); + + /** + * The choose method is invoked when the SamzaContainer is ready to process a + * new message. The chooser may elect to return any envelope that it's been + * given via the update method, which hasn't yet been returned. Choose will + * only be called after the chooser has been started. + * + * @return The next envelope to process, or null if the chooser has no + * messages or doesn't want to process any at the moment. + */ + IncomingMessageEnvelope choose(); + } + }}} + + The manner in which MessageChooser is used is: + + 1. SystemConsumers buffers messages from all SSPs as they become available. + 2. If MessageChooser has no messages for a given SSP, and SystemConsumers has a message in its buffer for the SSP, the MessageChooser will be updated once with the next message in the buffer. + 3. When SamzaContainer is ready to process another message, it calls SystemConsumers.choose, which in-turn calls MessageChooser.choose. + + The contract between the MessageChooser and the SystemConsumers class is: + + * Update can be called multiple times before choose is called. + * A null return from MessageChooser.choose means no envelopes should be processed at the moment. + * A MessageChooser may elect to return null when choose is called, even if unprocessed messages have been given by the update method. + * A MessageChooser will not have any of its in-memory state restored in the event of a failure. + * Blocking operations (such as Thread.sleep) will block all processing in the entire SamzaContainer. + * A MessageChooser should never return the same envelope more than once. + * Non-deterministic (e.g. time-based) MessageChoosers are allowed. + * A MessageChooser does not need to be thread-safe. + + == Problem == + + The MessageChooser is currently hard-wired to a default MessageChooser implementation, which uses a round-robin strategy to pick messages from all streams with outstanding messages. This implementation is insufficient for a number of use cases. + + === Problems === + + Questions worth asking are: + + 1. Is the MessageChooser the appropriate way to support the use cases outlined above? + 2. Can we implement a single default MessageChooser that solves all of the use cases listed above? + 3. Should the MessageChooser be pluggable? + 4. Is the MessageChooser's interface correct? + + + + In legacy (non-open sourced) versions of Samza, there was a configuration called "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes the Task will read from a stream. In Samza 0.7.0, the version that is in Apache Samza's master git repository right now, this throttling feature has been removed, and we've introduced the concept of a MessageChooser instead. Our justification for removing this style of throttling in 0.7.0 is that usually what folks really want is: + + 1. Per-stream prioritization (always take messages from stream X before messages from stream Y if both have messages available). + 2. Per-process isolation (I want to know that my process gets at least X % of the CPU, disk, network, etc). + 3. A way to intentionally ignore a stream even if messages are available. This can be used to prevent a Samza job from hammering away on a remote database or web service. Samza jobs can process messages much faster than a remote DB or service might be able to process RPC requests. When a Samza job is triggering an RPC call to a remote web service to join data to events, for instance, developers want a way to limit processing to prevent overwhelming the web service. By intentionally ignoring messages from a stream, you can slow down the rate at which the StreamTask gets messages, and thereby slow down the rate at which RPC calls are made. + + It's not actually the case that people want to voluntarily go slower than they can (which is what the old throttler was doing). + + For (2), we are going to be relying on CGroups. + + For (3), we're adding a new feature described in the [[Throttling]] design proposal and [[https://issues.apache.org/jira/browse/SAMZA-24|SAMZA-24]]. + + This design proposal describes how we will allow developers to define which message should be processed next when multiple stream/partitions have outstanding messages available. + + === Use Case === + + An example use case is a Samza task that will be consuming multiple streams where some streams may be from live systems that have stricter SLA (low-latency) requirements and must always be prioritized over other streams that may be from batch systems. The legacy throttling approach that we were using before (giving the batch system a max of 1mb/s) is not the ideal way to express this type of stream prioritization because configuring the "batch" streams with a low consumption rate will decrease the overall throughput of the system when there is no data in the "live" streams. Furthermore, we'll might want to throttle each "batch" stream based on external signals that can change over time. Because of the dynamic nature of these external signals, we would like to have a programmatic interface that can dynamically change the prioritization as the signal changes. + + Our current 0.7.0 MessageChooser implementation just round robins between streams. This has the effect of throttling the "batch" streams you're talking about, when messages are available from the "live" streams. When messages are not available from the live stream, the chooser will just choose the batch messages every time, thereby causing the task to process messages from the batch stream as fast as possible. + + One caveat here is that the MessageChooser is currently not pluggable. We've hard coded this behavior into a "DefaultChooser" for now. The motivation for keeping it from being pluggable is that we're not entirely certain of the interface yet, and we're also not sure how well it will integrate with our fault tolerance and state management semantics (what happens if a pluggable MessageChooser is maintaining state, and there's a failure?). + + This leads to the question: is the round robin approach the best approach for a generic message chooser (given that it's not pluggable at this time)? I think the answer is that it's not. A better approach would simply be a priority list that says, "Always process stream X, then stream Y, then stream Z." This would actually be better for your use case, since you could ALWAYS process "live" messages first, not just round-robin between live and batch streams. + + Another alternative would be, "always process the stream that is farthest behind," for some definition of "farthest behind". Some definitions include: + + 1) Farthest behind in wall-clock time (now() - timestamp in message). + 2) Farthest number of messages behind high watermark (when reading from Kafka). + + What's your take on this new 0.7.0 feature? I think the existing MessageChooser would accomodate your needs, as is, but could be made even better with a priority list. + + Questions: + + 1. Should the MessageChooser be pluggable? + 2. Is the MessageChooser interface correct? + 3. What should the default implementation do? + + == Potential Solutions == + + == Proposal == + + == Open Questions == + + + + + I believe that this implementation guarantees the following: + + 1. The chooser will have a message from all streams that have not yet hit head when choose is called. + 2. The main event loop will not be blocked while we are waiting for messages from streams that have yet to hit head. + + The first guarantee (1) is important because it means that a MessageChooser can bootstrap a stream from 0 to head simply by always choosing the envelope for the stream if its available. Consider a case where you want to read the entire currencyCode stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {"country": "USD", "amount": 1234}. You could then have your StreamTask join the currency symbol to each transaction message, and emit messages like {"amount": "$1234"}. + + To bootstrap the currencyCode stream, you need to read it from offset 0 all the way to the last message in the stream (what I'm calling head). It is not desirable to read any message from the transactions stream until the currencyCode stream has been fully read, or else you might try to join a transaction message to a country code that hasn't yet been read. Using the strategy above, SamzaContainer would get the offset of the last message in the stream at the time it starts; let's say the offset is 240. If the currencyCode stream is configured with samza.reset.offset, then SamzaContainer will initialize the TaskInstance to start reading from offset 0. A MessageChooser could then be implemented that simply always chooses envelopes from currencyCode if they're available, otherwise it picks envelopes from the transaction stream. Since the chooser is (1) guaranteed to always have a message available from currencyCode when choose is called if offset 240 has not yet been reached, this means the chooser will choose currencyCode messages 0-240 before it chooses any transaction envelopes. This results in a bootstrap behavior where currencyCode is read in its entirety for a given partition before any transaction messages are processed for the partition. + + The second guarantee (2) is important because it means that the window and commit methods will still be called at their expected intervals while the SystemConsumers class is waiting for messages from streamsYetToHitHead. For example, consider the case where it the current currencyCode offset is 237 (less than the last offset of 240), and the stream becomes unavailable for 10 minutes. If the MessageChooser has already chosen all envelopes from currencyCode, then we can't call choose on it, since it might accidentally choose a transaction message. Instead, we have to wait for the currencyCode stream to become available again, so we can read the next message and call MessageChooser.update with it. Rather than blocking with systemConsumer.poll(.., -1), we can instead poll the stream with a non-blocking timeout (i.e. 0), and then simply decide not to call the MessageChooser.choose method. In such a case, process will return null, which denotes that no new messages are available to process, and the window, send, and commit methods get called as usual. Then, the process loop repeats, and the currencyCode stream is polled again, to see if new messages are available. + + 1. Add getLastOffset(String streamName, Partition partition): String (*) + 2. Modify SamzaContainer to get SystemAdmin for input stream's system. + 3. Modify SamzaContainer to use SystemAdmins.getLastOffset to construct a Map[SystemStreamPartition, String], where value is last offset. + 4. Modify SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, String] parameter in the constructor. + 5. Modify SystemConsumers to have a streamsYetToHitHead: Set[SystemStreamPartition] variable that is initialized to contain all input system stream partitions. + 6. Modify SystemConsumers so that chooser.choose is called only if streamsYetToHitHead.length == 0 or the chooser has been given a message from every stream that has yet to hit head. + 7. Modify SystemConsumers to remove chooser.choose.getSystemStreamPartition from streamsYetToHitHead whenever lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset) + 8. Modify SystemConsumers.register to remove SystemStreamPartition from streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition)) + 8. Add a getLastOffset(String streamName, Partition partition): String method to KafkaSystemAdmin + + * Note, we might want to batch these offset requests to getLastOffset(Set[SystemStreamPartition] systemStreamPartition): Map[SystemStreamPartition, String] + + + + + DefaultChooser + task.chooser.priorities.kafka.currencyCodes=1 + task.chooser.priorities.kafka.transactions=0 + task.chooser.priorities.<system-name>.<stream-name>=<priority> + + problem: how to choose between two streams that are both not in caught up, and have the same priority? + offset (does lexicographical sort on offset string mean anything for this?) + number of messages behind head + percent behind + wall-clock time behind +
