Dear Wiki user, You have subscribed to a wiki page or wiki category on "Samza Wiki" for change notification.
The "Pluggable MessageChooser" page has been changed by ChrisRiccomini: https://wiki.apache.org/samza/Pluggable%20MessageChooser?action=diff&rev1=2&rev2=3 === Use cases === - ==== Streams with different SLAs ==== + ==== Streams with different latency SLAs ==== + + Some Samza jobs consume two streams: one stream is fed by a real-time system and the other stream is fed by a batch system. A typical pattern is to have a Samza processor with a statistical model that is ranking a real-time feed of data. Periodically, this model needs to be retrained and updated. The Samza processor can be re-deployed with the new model, but how do you re-process all of the old data that the processor has already seen? This can be accomplished by having a batch system send messages to the Samza processor for any data that needs to be re-processed. + + A concrete example of this is a Samza processor that is determining which country a user is coming from when they sign up for a website. This processor receives "please classify user" messages, and emits "user belongs to country X" message. + + Initially, the processor might simply use the user's IP address to determine which country they are located in. At some point in the future, the processor could be updated to use not only the IP address of the user, but also the user's language setting. This new strategy leads to much more accurate classification, and we now want to re-process all of the old users using the new model. How? A naive approach would be to re-process all of the old users in an offline system like Hadoop, and then push the computed data to the same store that the Samza processor is writing to. This is not desirable because it means that you have to duplicate your classification logic in two places. A better approach is to have the batch system simply re-send "please classify user" messages for all users that need to be re-classified. + + The problem with the second approach is that it leads to a situation where the batch system can produce thousand or even millions of messages a second. We don't want to starve the real-time system, whose traffic is much higher priority, since the users have just signed up, and are probably actively browsing the website when the message is being processed. An optimal prioritization between these two streams would be to always process messages from the real-time system when they are available, and only process messages from the batch system when no real-time processing are available to process. ==== Bootstrapping state streams ==== + Some Samza jobs wish to fully consume a stream from offset 0 all the way through to the last message in the stream before they process messages from any other stream. This is useful for streams that have some key-value data that a Samza job wishes to use when processing messages from another stream. + + Consider a case where you want to read a currencyCode stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {"country": "USD", "amount": 1234}. You could then have your StreamTask join the currency symbol to each transaction message, and emit messages like {"amount": "$1234"}. + + To bootstrap the currencyCode stream, you need to read it from offset 0 all the way to the last message in the stream (what I'm calling head). It is not desirable to read any message from the transactions stream until the currencyCode stream has been fully read, or else you might try to join a transaction message to a country code that hasn't yet been read. + ==== Time-aligned streams ==== + + Some Samza jobs wish to keep their input streams as time-aligned as possible. This allows lets StreamTasks to have a smaller buffer when trying to joins or buffered sorts. + + Imagine that a Samza job is reading from two streams: AdViews and AdClicks. It's joining AdViews and AdClicks to produce a click-through stream that defines the % of views that had a click. This Samza job would buffer all AdViews for some period of time (say 5 minutes). As AdClicks come in, the job would join the AdView with the AdClick, and emit a message like (ad-id: 1234, click: true). Any AdViews that are in the window for more than 5 minutes are assumed to have no click, and messages are sent with (ad-id: 2345, click: false). + + The problem that this kind of job runs into is that it wants to keep the AdView events and AdClick events as closely aligned (based on the time the messages were sent) as possible, because it allows the job to shrink the buffering window, which frees up memory and disk space. If you can keep AdView and AdClick messages within 2 minutes of each other, you only need a 2 minute buffer. It doesn't make any sense to process and buffer an AdView event from two seconds ago if an AdClick from 1 minute ago can be processed and its AdView can be removed from the buffer (thereby freeing up space). === MessageChooser ===
