Dear Wiki user, You have subscribed to a wiki page or wiki category on "Samza Wiki" for change notification.
The "Pluggable MessageChooser" page has been changed by ChrisRiccomini: https://wiki.apache.org/samza/Pluggable%20MessageChooser?action=diff&rev1=4&rev2=5 - = Pluggable !MessageChooser = + = Pluggable MessageChooser = == Introduction == - When a Samza job is reading messages from more than one system/stream/partition (SSP), a decision needs to be made about the order in which the messages from these SSPs are processed. For example, if a message is available for (stream: AdViewEvent, partition: 7), and so is a message for (stream: AdClickEvent, partition: 7), which message should the SamzaContainer process next? + When a Samza job is reading messages from more than one system/stream/partition (SSP), a decision needs to be made about the order in which the messages from these SSPs are processed. For example, if a message is available for (stream: !AdViewEvent, partition: 7), and so is a message for (stream: !AdClickEvent, partition: 7), which message should the !SamzaContainer process next? It turns out, the order to process messages can be determined based on any number of strategies: @@ -31, +31 @@ Some Samza jobs wish to fully consume a stream from offset 0 all the way through to the last message in the stream before they process messages from any other stream. This is useful for streams that have some key-value data that a Samza job wishes to use when processing messages from another stream. - Consider a case where you want to read a currencyCode stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {"country": "USD", "amount": 1234}. You could then have your StreamTask join the currency symbol to each transaction message, and emit messages like {"amount": "$1234"}. + Consider a case where you want to read a currencyCode stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {"country": "USD", "amount": 1234}. You could then have your !StreamTask join the currency symbol to each transaction message, and emit messages like {"amount": "$1234"}. To bootstrap the currencyCode stream, you need to read it from offset 0 all the way to the last message in the stream (what I'm calling head). It is not desirable to read any message from the transactions stream until the currencyCode stream has been fully read, or else you might try to join a transaction message to a country code that hasn't yet been read. ==== Time-aligned streams ==== - Some Samza jobs wish to keep their input streams as time-aligned as possible. This allows lets StreamTasks to have a smaller buffer when trying to joins or buffered sorts. + Some Samza jobs wish to keep their input streams as time-aligned as possible. This allows lets !StreamTasks to have a smaller buffer when trying to joins or buffered sorts. - Imagine that a Samza job is reading from two streams: AdViews and AdClicks. It's joining AdViews and AdClicks to produce a click-through stream that defines the % of views that had a click. This Samza job would buffer all AdViews for some period of time (say 5 minutes). As AdClicks come in, the job would join the AdView with the AdClick, and emit a message like (ad-id: 1234, click: true). Any AdViews that are in the window for more than 5 minutes are assumed to have no click, and messages are sent with (ad-id: 2345, click: false). + Imagine that a Samza job is reading from two streams: !AdViews and !AdClicks. It's joining !AdViews and !AdClicks to produce a click-through stream that defines the % of views that had a click. This Samza job would buffer all !AdViews for some period of time (say 5 minutes). As !AdClicks come in, the job would join the !AdView with the !AdClick, and emit a message like (ad-id: 1234, click: true). Any !AdViews that are in the window for more than 5 minutes are assumed to have no click, and messages are sent with (ad-id: 2345, click: false). - The problem that this kind of job runs into is that it wants to keep the AdView events and AdClick events as closely aligned (based on the time the messages were sent) as possible, because it allows the job to shrink the buffering window, which frees up memory and disk space. If you can keep AdView and AdClick messages within 2 minutes of each other, you only need a 2 minute buffer. It doesn't make any sense to process and buffer an AdView event from two seconds ago if an AdClick from 1 minute ago can be processed and its AdView can be removed from the buffer (thereby freeing up space). + The problem that this kind of job runs into is that it wants to keep the !AdView events and !AdClick events as closely aligned (based on the time the messages were sent) as possible, because it allows the job to shrink the buffering window, which frees up memory and disk space. If you can keep !AdView and !AdClick messages within 2 minutes of each other, you only need a 2 minute buffer. It doesn't make any sense to process and buffer an !AdView event from two seconds ago if an !AdClick from 1 minute ago can be processed and its !AdView can be removed from the buffer (thereby freeing up space). - === !MessageChooser === + === MessageChooser === In Samza 0.7.0 we've introduced a class called !MessageChooser. This is an object that is given up to one message at a time from each input stream/partition pair (when they're available). The chooser's job is to return the incoming message envelope that should be processed next. - In plain-spoken English, Samza says, "I have IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?" It's the !MessageChooser's job to answer this question. + In plain-spoken English, Samza says, "I have !IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?" It's the !MessageChooser's job to answer this question. The !MessageChooser's interface currently looks like this: {{{ - interface !MessageChooser { + interface MessageChooser { /** * Notify the chooser that a new envelope is available for a processing. A - * !MessageChooser will receive, at most, one outstanding envelope per + * MessageChooser will receive, at most, one outstanding envelope per * system/stream/partition combination. For example, if update is called for * partition 7 of kafka.mystream, then update will not be called with an * envelope from partition 7 of kafka.mystream until the previous envelope has @@ -65, +65 @@ * @param envelope * An unprocessed envelope. */ - void update(IncomingMessageEnvelope envelope); + void update(!IncomingMessageEnvelope envelope); /** * The choose method is invoked when the SamzaContainer is ready to process a @@ -76, +76 @@ * @return The next envelope to process, or null if the chooser has no * messages or doesn't want to process any at the moment. */ - IncomingMessageEnvelope choose(); + !IncomingMessageEnvelope choose(); } }}} @@ -84, +84 @@ 1. !SystemConsumers buffers messages from all SSPs as they become available. 2. If !MessageChooser has no messages for a given SSP, and !SystemConsumers has a message in its buffer for the SSP, the !MessageChooser will be updated once with the next message in the buffer. - 3. When SamzaContainer is ready to process another message, it calls !SystemConsumers.choose, which in-turn calls !MessageChooser.choose. + 3. When !SamzaContainer is ready to process another message, it calls !SystemConsumers.choose, which in-turn calls !MessageChooser.choose. The contract between the !MessageChooser and the !SystemConsumers class is: @@ -92, +92 @@ * A null return from !MessageChooser.choose means no envelopes should be processed at the moment. * A !MessageChooser may elect to return null when choose is called, even if unprocessed messages have been given by the update method. * A !MessageChooser will not have any of its in-memory state restored in the event of a failure. - * Blocking operations (such as Thread.sleep) will block all processing in the entire SamzaContainer. + * Blocking operations (such as Thread.sleep) will block all processing in the entire !SamzaContainer. * A !MessageChooser should never return the same envelope more than once. * Non-deterministic (e.g. time-based) !MessageChoosers are allowed. * A !MessageChooser does not need to be thread-safe. @@ -116, +116 @@ Such an implementation would allow us to support the QoS, bootstrapping, and time-alignment use cases. - == Half-baked proposal == + == Proposal == First, some Axioms that I believe to be true: 1. We can't support prioritization based on message content without the !MessageChooser being pluggable. - 2. We can't support time-aligned prioritization without either a pluggable !MessageChooser (that can introspect a message and extract the timestamp), a timestamp field in IncomingMessageEnvelope, or some equivalent way to get message stream-time. + 2. We can't support time-aligned prioritization without either a pluggable !MessageChooser (that can introspect a message and extract the timestamp), a timestamp field in !IncomingMessageEnvelope, or some equivalent way to get message stream-time. 3. The !MessageChooser must know if a stream is "caught up" or still being bootstrapped when making a decision about which message to process next. 4. Once a stream is "caught up" (envelope.getOffset == offsetOfLastMessageInStream at least once), we can assume the stream has been fully "bootstrapped". @@ -133, +133 @@ The changes required to do this are: - 1. Add getLastOffset(String streamName, Partition partition): String (*) to SystemAdmin interface. + 1. Add getLastOffset(String streamName, Partition partition): String (*) to !SystemAdmin interface. - 2. Modify SamzaContainer to get SystemAdmin for input stream's system. + 2. Modify !SamzaContainer to get !SystemAdmin for input stream's system. - 3. Modify SamzaContainer to use SystemAdmins.getLastOffset to construct a Map[SystemStreamPartition, String], where value is last offset. + 3. Modify !SamzaContainer to use !SystemAdmins.getLastOffset to construct a Map[!SystemStreamPartition, String], where value is last offset. - 4. Modify !SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, String] parameter in the constructor. + 4. Modify !SystemConsumers to take a lastOffsets: Map[!SystemStreamPartition, String] parameter in the constructor. - 5. Modify !SystemConsumers to have a streamsYetToHitHead: Set[SystemStreamPartition] variable that is initialized to contain all input system stream partitions. + 5. Modify !SystemConsumers to have a streamsYetToHitHead: Set[!SystemStreamPartition] variable that is initialized to contain all input system stream partitions. - 6. Modify !SystemConsumers so that chooser.choose is called only if streamsYetToHitHead.length == 0 or the chooser has been given a message from every stream that has yet to hit head. + 6. Modify !SystemConsumers so that chooser.choose is called only if streamsYetToHitHead.length == 0 or the chooser has been given a message from every stream that has yet to hit head. - 7. Modify !SystemConsumers to remove chooser.choose.getSystemStreamPartition from streamsYetToHitHead whenever lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset) + 7. Modify !SystemConsumers to remove chooser.choose.getSystemStreamPartition from streamsYetToHitHead whenever lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset) - 8. Modify !SystemConsumers.register to remove SystemStreamPartition from streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition)) + 8. Modify !SystemConsumers.register to remove !SystemStreamPartition from streamsYetToHitHead if lastReadOffset.equals(lastOffsets(!SystemStreamPartition)) - 8. Add a getLastOffset(String streamName, Partition partition): String method to KafkaSystemAdmin + 9. Add a getLastOffset(String streamName, Partition partition): String method to !KafkaSystemAdmin - * Note, we might want to batch these offset requests to getLastOffset(Set[SystemStreamPartition] systemStreamPartition): Map[SystemStreamPartition, String] + * Note, we might want to batch these offset requests to getLastOffset(Set[!SystemStreamPartition] !SystemStreamPartition): Map[!SystemStreamPartition, String] === QoS === @@ -189, +189 @@ In cases where two streams have the same priority, we need to implement a strategy (either time-aligned, round robin, or some proxy for time-aligned). I haven't come up with an opinion on which strategy we should use yet. - === !MessageChooser interface === + === MessageChooser interface === I think the !MessageChooser interface is fine as it is. Initially, I wanted to add register/start/stop methods to it. - The main motivation for start/stop is allow developers to setup a client that queries some outside service to make picking decisions. I'm not saying that this is advisable, but I know people will try and do it. Without stop, there's no way to shut down the client when the service stops. If we assume the service never stops, then this isn't a problem, but if there is a definite "end" to the processor (i.e. TaskCoordinator.shutdown), then the chooser needs a graceful shutdown. + The main motivation for start/stop is that it allows developers to setup a client that queries some outside service to make picking decisions. I'm not saying that this is advisable, but I know people will try and do it. Without stop, there's no way to shut down the client when the service stops. If we assume the service never stops, then this isn't a problem, but if there is a definite "end" to the processor (i.e. !TaskCoordinator.shutdown), then the chooser needs a graceful shutdown. - The motivation for register is that there are situations where you want to initialize your data structures (or whatever) on startup before any messages are received. Letting the !MessageChooser know which SystemStreamPartitions it's going to be receiving messages from just seems like a good idea.. + The motivation for register is that there are situations where you want to initialize your data structures (or whatever) on startup before any messages are received. Letting the !MessageChooser know which !SystemStreamPartitions it's going to be receiving messages from just seems like a good idea.. I have since backed off on this idea since I can't come up with a good concrete example of why we need them, and all of the reference implementations we've written so far wouldn't need them. + == Open questions == + + A (probably partial) list of open questions: + + 1. How should we prioritize streams that are the same priority in the !DefaultChooser? + 2. Should we add a concept of time to Samza? + 3. What is the behavior of aligning streams by messages behind high watermark, or percent behind high watermark? +
