Dear Wiki user, You have subscribed to a wiki page or wiki category on "Samza Wiki" for change notification.
The "Pluggable MessageChooser" page has been changed by ChrisRiccomini: https://wiki.apache.org/samza/Pluggable%20MessageChooser?action=diff&rev1=3&rev2=4 - = Pluggable MessageChooser = + = Pluggable !MessageChooser = == Introduction == When a Samza job is reading messages from more than one system/stream/partition (SSP), a decision needs to be made about the order in which the messages from these SSPs are processed. For example, if a message is available for (stream: AdViewEvent, partition: 7), and so is a message for (stream: AdClickEvent, partition: 7), which message should the SamzaContainer process next? - It turns out, the order in which messages are processed by SamzaContainer impacts a number of use cases. + It turns out, the order to process messages can be determined based on any number of strategies: + + * Pick which message to read based on its contents (e.g. stream time). + * Backlog (pick whichever stream is farther behind the head of the stream). + * Initialization (always pick stream X until it's caught up to the head of the stream). + * Quality of service (if I've picked too many messages from topic X, don't pick from it for a while, so we don't starve other topics). + * Round robin (just pick in a circular buffer) + * Batching (pick 100 messages from stream X, then 100 messages from stream Y, etc). === Use cases === @@ -36, +43 @@ The problem that this kind of job runs into is that it wants to keep the AdView events and AdClick events as closely aligned (based on the time the messages were sent) as possible, because it allows the job to shrink the buffering window, which frees up memory and disk space. If you can keep AdView and AdClick messages within 2 minutes of each other, you only need a 2 minute buffer. It doesn't make any sense to process and buffer an AdView event from two seconds ago if an AdClick from 1 minute ago can be processed and its AdView can be removed from the buffer (thereby freeing up space). - === MessageChooser === + === !MessageChooser === - In Samza 0.7.0 we've introduced a class called MessageChooser. This is an object that is given up to one message at a time from each input stream/partition pair (when they're available). The chooser's job is to return the incoming message envelope that should be processed next. + In Samza 0.7.0 we've introduced a class called !MessageChooser. This is an object that is given up to one message at a time from each input stream/partition pair (when they're available). The chooser's job is to return the incoming message envelope that should be processed next. - In plain-spoken English, the Samza says, "I have IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?" It's the MessageChooser's job to answer this question. + In plain-spoken English, Samza says, "I have IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?" It's the !MessageChooser's job to answer this question. - The MessageChooser's interface currently looks like this: + The !MessageChooser's interface currently looks like this: {{{ - interface MessageChooser { + interface !MessageChooser { /** * Notify the chooser that a new envelope is available for a processing. A - * MessageChooser will receive, at most, one outstanding envelope per + * !MessageChooser will receive, at most, one outstanding envelope per * system/stream/partition combination. For example, if update is called for * partition 7 of kafka.mystream, then update will not be called with an * envelope from partition 7 of kafka.mystream until the previous envelope has @@ -73, +80 @@ } }}} - The manner in which MessageChooser is used is: + The manner in which !MessageChooser is used is: - 1. SystemConsumers buffers messages from all SSPs as they become available. + 1. !SystemConsumers buffers messages from all SSPs as they become available. - 2. If MessageChooser has no messages for a given SSP, and SystemConsumers has a message in its buffer for the SSP, the MessageChooser will be updated once with the next message in the buffer. + 2. If !MessageChooser has no messages for a given SSP, and !SystemConsumers has a message in its buffer for the SSP, the !MessageChooser will be updated once with the next message in the buffer. - 3. When SamzaContainer is ready to process another message, it calls SystemConsumers.choose, which in-turn calls MessageChooser.choose. + 3. When SamzaContainer is ready to process another message, it calls !SystemConsumers.choose, which in-turn calls !MessageChooser.choose. - The contract between the MessageChooser and the SystemConsumers class is: + The contract between the !MessageChooser and the !SystemConsumers class is: * Update can be called multiple times before choose is called. - * A null return from MessageChooser.choose means no envelopes should be processed at the moment. + * A null return from !MessageChooser.choose means no envelopes should be processed at the moment. - * A MessageChooser may elect to return null when choose is called, even if unprocessed messages have been given by the update method. + * A !MessageChooser may elect to return null when choose is called, even if unprocessed messages have been given by the update method. - * A MessageChooser will not have any of its in-memory state restored in the event of a failure. + * A !MessageChooser will not have any of its in-memory state restored in the event of a failure. * Blocking operations (such as Thread.sleep) will block all processing in the entire SamzaContainer. - * A MessageChooser should never return the same envelope more than once. + * A !MessageChooser should never return the same envelope more than once. - * Non-deterministic (e.g. time-based) MessageChoosers are allowed. + * Non-deterministic (e.g. time-based) !MessageChoosers are allowed. - * A MessageChooser does not need to be thread-safe. + * A !MessageChooser does not need to be thread-safe. == Problem == - The MessageChooser is currently hard-wired to a default MessageChooser implementation, which uses a round-robin strategy to pick messages from all streams with outstanding messages. This implementation is insufficient for a number of use cases. + The !MessageChooser is currently hard-wired to a default !MessageChooser implementation, which uses a round-robin strategy to pick messages from all streams with outstanding messages. This implementation is insufficient. - === Problems === + Problems to think about: - Questions worth asking are: + 1. Do we accept that the use cases and strategies listed above are legitimate? + 2. Is the !MessageChooser the appropriate way to support the use cases outlined above? + 3. Is the !MessageChooser's interface correct to support the use cases outlined above? + 4. Can we implement a single default !MessageChooser that solves all of the use cases listed above? + 5. Should the !MessageChooser be pluggable? + The general feedback I've gotten is that question 4 is most important. I pretty much agree. The default implementation should: - 1. Is the MessageChooser the appropriate way to support the use cases outlined above? - 2. Can we implement a single default MessageChooser that solves all of the use cases listed above? - 3. Should the MessageChooser be pluggable? - 4. Is the MessageChooser's interface correct? + 1. Allow preferring certain streams for the state bootstrap. + 2. Keep all streams of the same priority roughly in sync. + Such an implementation would allow us to support the QoS, bootstrapping, and time-alignment use cases. - In legacy (non-open sourced) versions of Samza, there was a configuration called "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes the Task will read from a stream. In Samza 0.7.0, the version that is in Apache Samza's master git repository right now, this throttling feature has been removed, and we've introduced the concept of a MessageChooser instead. Our justification for removing this style of throttling in 0.7.0 is that usually what folks really want is: + == Half-baked proposal == + First, some Axioms that I believe to be true: - 1. Per-stream prioritization (always take messages from stream X before messages from stream Y if both have messages available). - 2. Per-process isolation (I want to know that my process gets at least X % of the CPU, disk, network, etc). - 3. A way to intentionally ignore a stream even if messages are available. This can be used to prevent a Samza job from hammering away on a remote database or web service. Samza jobs can process messages much faster than a remote DB or service might be able to process RPC requests. When a Samza job is triggering an RPC call to a remote web service to join data to events, for instance, developers want a way to limit processing to prevent overwhelming the web service. By intentionally ignoring messages from a stream, you can slow down the rate at which the StreamTask gets messages, and thereby slow down the rate at which RPC calls are made. - It's not actually the case that people want to voluntarily go slower than they can (which is what the old throttler was doing). + 1. We can't support prioritization based on message content without the !MessageChooser being pluggable. + 2. We can't support time-aligned prioritization without either a pluggable !MessageChooser (that can introspect a message and extract the timestamp), a timestamp field in IncomingMessageEnvelope, or some equivalent way to get message stream-time. + 3. The !MessageChooser must know if a stream is "caught up" or still being bootstrapped when making a decision about which message to process next. + 4. Once a stream is "caught up" (envelope.getOffset == offsetOfLastMessageInStream at least once), we can assume the stream has been fully "bootstrapped". - For (2), we are going to be relying on CGroups. + I believe this set of rules implies that we have to have a pluggable !MessageChooser, unless we decide to add some concept of time to Samza. - For (3), we're adding a new feature described in the [[Throttling]] design proposal and [[https://issues.apache.org/jira/browse/SAMZA-24|SAMZA-24]]. + === Bootstrapping === - This design proposal describes how we will allow developers to define which message should be processed next when multiple stream/partitions have outstanding messages available. + To support bootstrapping, I propose that we change the !SystemConsumers behavior slightly. Right now, !SystemConsumers alternates between calling update and choose methods. We should change !SystemConsumers to only call choose if the !MessageChooser has been updated with envelopes from all SSPs that are not yet at head. If we make this change, then a simple hard-coded priority chooser will allow us to bootstrap simply by prioritizing the bootstrap stream as higher priority than anything else (e.g. currencyCodes=1, transactions=0). If you do this, the chooser will always choose the bootstrap stream, and the !SystemConsumers will guarantee that the chooser will always have a message from the bootstrap stream until it's at head. - === Use Case === + The changes required to do this are: - An example use case is a Samza task that will be consuming multiple streams where some streams may be from live systems that have stricter SLA (low-latency) requirements and must always be prioritized over other streams that may be from batch systems. The legacy throttling approach that we were using before (giving the batch system a max of 1mb/s) is not the ideal way to express this type of stream prioritization because configuring the "batch" streams with a low consumption rate will decrease the overall throughput of the system when there is no data in the "live" streams. Furthermore, we'll might want to throttle each "batch" stream based on external signals that can change over time. Because of the dynamic nature of these external signals, we would like to have a programmatic interface that can dynamically change the prioritization as the signal changes. - - Our current 0.7.0 MessageChooser implementation just round robins between streams. This has the effect of throttling the "batch" streams you're talking about, when messages are available from the "live" streams. When messages are not available from the live stream, the chooser will just choose the batch messages every time, thereby causing the task to process messages from the batch stream as fast as possible. - - One caveat here is that the MessageChooser is currently not pluggable. We've hard coded this behavior into a "DefaultChooser" for now. The motivation for keeping it from being pluggable is that we're not entirely certain of the interface yet, and we're also not sure how well it will integrate with our fault tolerance and state management semantics (what happens if a pluggable MessageChooser is maintaining state, and there's a failure?). - - This leads to the question: is the round robin approach the best approach for a generic message chooser (given that it's not pluggable at this time)? I think the answer is that it's not. A better approach would simply be a priority list that says, "Always process stream X, then stream Y, then stream Z." This would actually be better for your use case, since you could ALWAYS process "live" messages first, not just round-robin between live and batch streams. - - Another alternative would be, "always process the stream that is farthest behind," for some definition of "farthest behind". Some definitions include: - - 1) Farthest behind in wall-clock time (now() - timestamp in message). - 2) Farthest number of messages behind high watermark (when reading from Kafka). - - What's your take on this new 0.7.0 feature? I think the existing MessageChooser would accomodate your needs, as is, but could be made even better with a priority list. - - Questions: - - 1. Should the MessageChooser be pluggable? - 2. Is the MessageChooser interface correct? - 3. What should the default implementation do? - - == Potential Solutions == - - == Proposal == - - == Open Questions == - - - - - I believe that this implementation guarantees the following: - - 1. The chooser will have a message from all streams that have not yet hit head when choose is called. - 2. The main event loop will not be blocked while we are waiting for messages from streams that have yet to hit head. - - The first guarantee (1) is important because it means that a MessageChooser can bootstrap a stream from 0 to head simply by always choosing the envelope for the stream if its available. Consider a case where you want to read the entire currencyCode stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {"country": "USD", "amount": 1234}. You could then have your StreamTask join the currency symbol to each transaction message, and emit messages like {"amount": "$1234"}. - - To bootstrap the currencyCode stream, you need to read it from offset 0 all the way to the last message in the stream (what I'm calling head). It is not desirable to read any message from the transactions stream until the currencyCode stream has been fully read, or else you might try to join a transaction message to a country code that hasn't yet been read. Using the strategy above, SamzaContainer would get the offset of the last message in the stream at the time it starts; let's say the offset is 240. If the currencyCode stream is configured with samza.reset.offset, then SamzaContainer will initialize the TaskInstance to start reading from offset 0. A MessageChooser could then be implemented that simply always chooses envelopes from currencyCode if they're available, otherwise it picks envelopes from the transaction stream. Since the chooser is (1) guaranteed to always have a message available from currencyCode when choose is called if offset 240 has not yet been reached, this means the chooser will choose currencyCode messages 0-240 before it chooses any transaction envelopes. This results in a bootstrap behavior where currencyCode is read in its entirety for a given partition before any transaction messages are processed for the partition. - - The second guarantee (2) is important because it means that the window and commit methods will still be called at their expected intervals while the SystemConsumers class is waiting for messages from streamsYetToHitHead. For example, consider the case where it the current currencyCode offset is 237 (less than the last offset of 240), and the stream becomes unavailable for 10 minutes. If the MessageChooser has already chosen all envelopes from currencyCode, then we can't call choose on it, since it might accidentally choose a transaction message. Instead, we have to wait for the currencyCode stream to become available again, so we can read the next message and call MessageChooser.update with it. Rather than blocking with systemConsumer.poll(.., -1), we can instead poll the stream with a non-blocking timeout (i.e. 0), and then simply decide not to call the MessageChooser.choose method. In such a case, process will return null, which denotes that no new messages are available to process, and the window, send, and commit methods get called as usual. Then, the process loop repeats, and the currencyCode stream is polled again, to see if new messages are available. - - 1. Add getLastOffset(String streamName, Partition partition): String (*) + 1. Add getLastOffset(String streamName, Partition partition): String (*) to SystemAdmin interface. 2. Modify SamzaContainer to get SystemAdmin for input stream's system. 3. Modify SamzaContainer to use SystemAdmins.getLastOffset to construct a Map[SystemStreamPartition, String], where value is last offset. - 4. Modify SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, String] parameter in the constructor. + 4. Modify !SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, String] parameter in the constructor. - 5. Modify SystemConsumers to have a streamsYetToHitHead: Set[SystemStreamPartition] variable that is initialized to contain all input system stream partitions. + 5. Modify !SystemConsumers to have a streamsYetToHitHead: Set[SystemStreamPartition] variable that is initialized to contain all input system stream partitions. - 6. Modify SystemConsumers so that chooser.choose is called only if streamsYetToHitHead.length == 0 or the chooser has been given a message from every stream that has yet to hit head. + 6. Modify !SystemConsumers so that chooser.choose is called only if streamsYetToHitHead.length == 0 or the chooser has been given a message from every stream that has yet to hit head. - 7. Modify SystemConsumers to remove chooser.choose.getSystemStreamPartition from streamsYetToHitHead whenever lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset) + 7. Modify !SystemConsumers to remove chooser.choose.getSystemStreamPartition from streamsYetToHitHead whenever lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset) - 8. Modify SystemConsumers.register to remove SystemStreamPartition from streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition)) + 8. Modify !SystemConsumers.register to remove SystemStreamPartition from streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition)) 8. Add a getLastOffset(String streamName, Partition partition): String method to KafkaSystemAdmin * Note, we might want to batch these offset requests to getLastOffset(Set[SystemStreamPartition] systemStreamPartition): Map[SystemStreamPartition, String] + === QoS === + This is actually the easiest use case. Again, I think a simple priority chooser will take care of this. We don't even need to change the !SystemConsumers behavior to support this use case. If we set the priority for real-time=1 and batch-stream=0, then real-time messages will always be processed before batch messages, provided that they're available. If we make the change to !SystemConsumers outlined in the bootstrapping section, this will also guarantee that, when a processor is stopped and started, the real-time stream will be fully caught up to "head" before any batch messages are processed. + === Time alignment === + As mentioned in the axiom list above, I don't think we can support time-aligned streams without either adding a concept of time to Samza or allowing the !MessageChooser to introspect into the message. I am abandoning supporting this feature in the default message chooser. Instead, I think we should just support pluggable !MessageChoosers, and allow developers to implement their own time-aligned !MessageChoosers. - DefaultChooser - task.chooser.priorities.kafka.currencyCodes=1 - task.chooser.priorities.kafka.transactions=0 - task.chooser.priorities.<system-name>.<stream-name>=<priority> + A question worth asking is: can we make the default chooser support a poor-man's version of this? Maybe we can't align directly by stream-time, but other proxies are: - problem: how to choose between two streams that are both not in caught up, and have the same priority? - offset (does lexicographical sort on offset string mean anything for this?) - number of messages behind head - percent behind - wall-clock time behind + * Choose the message with the smallest offset. + * Choose the message from the stream with the most unprocessed messages. + * Choose the message from the stream that is the farthest % behind head. + + ==== Offsets ==== + + Offset is the only one we could support with no API changes. The problems with supporting offset-aligned streams are: + + 1. Different stream systems might have different offset styles. How do you sort between two entirely different offset styles? + 2. Offsets are represented as strings. Kafka offsets are longs. Sorting strings that contains longs using lexicographical sort leads to "2" > "100". + + Additionally, the behavior of a chooser that chooses messages by lowest offset is somewhat unintuitive. If you have a Kafka stream X with current offset 1 billion, and a new Kafka stream Y with offset 0, prioritizing by offset effectively means that you are always going to take messages from stream Y until its offset number catches up to stream X, which might never happen if stream X gets more messages/sec than stream Y. + + ==== Messages behind head ==== + + I need to think about this a bit more. + + ==== Percent behind head ==== + + I need to think about this a bit more. + + === DefaultChooser === + + I think the default chooser should be a simple hard-coded priority chooser, which lets you say, "If you have messages from stream X, always read them before messages from stream Y." This can be implemented with a config: + + {{{ + task.chooser.priorities.kafka.currencyCode=1 + task.chooser.priorities.kafka.transactions=0 + }}} + + The convention is task.chooser.priorities.<system>.<stream>=<priority>, where higher priority streams get processed between lower priority streams. + + In cases where two streams have the same priority, we need to implement a strategy (either time-aligned, round robin, or some proxy for time-aligned). I haven't come up with an opinion on which strategy we should use yet. + + === !MessageChooser interface === + + I think the !MessageChooser interface is fine as it is. Initially, I wanted to add register/start/stop methods to it. + + The main motivation for start/stop is allow developers to setup a client that queries some outside service to make picking decisions. I'm not saying that this is advisable, but I know people will try and do it. Without stop, there's no way to shut down the client when the service stops. If we assume the service never stops, then this isn't a problem, but if there is a definite "end" to the processor (i.e. TaskCoordinator.shutdown), then the chooser needs a graceful shutdown. + + The motivation for register is that there are situations where you want to initialize your data structures (or whatever) on startup before any messages are received. Letting the !MessageChooser know which SystemStreamPartitions it's going to be receiving messages from just seems like a good idea.. + + I have since backed off on this idea since I can't come up with a good concrete example of why we need them, and all of the reference implementations we've written so far wouldn't need them. +
