Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Samza Wiki" for change 
notification.

The "Pluggable MessageChooser" page has been changed by ChrisRiccomini:
https://wiki.apache.org/samza/Pluggable%20MessageChooser?action=diff&rev1=1&rev2=2

- Initial setup.
+ = Pluggable MessageChooser =
  
+ == Introduction ==
+ 
+ When a Samza job is reading messages from more than one 
system/stream/partition (SSP), a decision needs to be made about the order in 
which the messages from these SSPs are processed.  For example, if a message is 
available for (stream: AdViewEvent, partition: 7), and so is a message for 
(stream: AdClickEvent, partition: 7), which message should the SamzaContainer 
process next?
+ 
+ It turns out, the order in which messages are processed by SamzaContainer 
impacts a number of use cases.
+ 
+ === Use cases ===
+ 
+ ==== Streams with different SLAs ====
+ 
+ ==== Bootstrapping state streams ====
+ 
+ ==== Time-aligned streams ====
+ 
+ === MessageChooser ===
+ 
+ In Samza 0.7.0 we've introduced a class called MessageChooser. This is an 
object that is given up to one message at a time from each input 
stream/partition pair (when they're available). The chooser's job is to return 
the incoming message envelope that should be processed next.
+ 
+ In plain-spoken English, the Samza says, "I have IncomingMessageEnvelopes 
from stream/partition X, and stream/partitionY, which should I process next?" 
It's the MessageChooser's job to answer this question.
+ 
+ The MessageChooser's interface currently looks like this:
+ 
+ {{{
+ interface MessageChooser {
+   /**
+    * Notify the chooser that a new envelope is available for a processing. A
+    * MessageChooser will receive, at most, one outstanding envelope per
+    * system/stream/partition combination. For example, if update is called for
+    * partition 7 of kafka.mystream, then update will not be called with an
+    * envelope from partition 7 of kafka.mystream until the previous envelope 
has
+    * been returned via the choose method. Update will only be invoked after 
the
+    * chooser has been started.
+    * 
+    * @param envelope
+    *          An unprocessed envelope.
+    */
+   void update(IncomingMessageEnvelope envelope);
+ 
+   /**
+    * The choose method is invoked when the SamzaContainer is ready to process 
a
+    * new message. The chooser may elect to return any envelope that it's been
+    * given via the update method, which hasn't yet been returned. Choose will
+    * only be called after the chooser has been started.
+    * 
+    * @return The next envelope to process, or null if the chooser has no
+    *         messages or doesn't want to process any at the moment.
+    */
+   IncomingMessageEnvelope choose();
+ }
+ }}}
+ 
+ The manner in which MessageChooser is used is:
+ 
+  1. SystemConsumers buffers messages from all SSPs as they become available.
+  2. If MessageChooser has no messages for a given SSP, and SystemConsumers 
has a message in its buffer for the SSP, the MessageChooser will be updated 
once with the next message in the buffer.
+  3. When SamzaContainer is ready to process another message, it calls 
SystemConsumers.choose, which in-turn calls MessageChooser.choose.
+ 
+ The contract between the MessageChooser and the SystemConsumers class is:
+ 
+  * Update can be called multiple times before choose is called.
+  * A null return from MessageChooser.choose means no envelopes should be 
processed at the moment.
+  * A MessageChooser may elect to return null when choose is called, even if 
unprocessed messages have been given by the update method.
+  * A MessageChooser will not have any of its in-memory state restored in the 
event of a failure.
+  * Blocking operations (such as Thread.sleep) will block all processing in 
the entire SamzaContainer.
+  * A MessageChooser should never return the same envelope more than once.
+  * Non-deterministic (e.g. time-based) MessageChoosers are allowed.
+  * A MessageChooser does not need to be thread-safe.
+ 
+ == Problem ==
+ 
+ The MessageChooser is currently hard-wired to a default MessageChooser 
implementation, which uses a round-robin strategy to pick messages from all 
streams with outstanding messages. This implementation is insufficient for a 
number of use cases.
+ 
+ === Problems ===
+ 
+ Questions worth asking are:
+ 
+ 1. Is the MessageChooser the appropriate way to support the use cases 
outlined above?
+ 2. Can we implement a single default MessageChooser that solves all of the 
use cases listed above?
+ 3. Should the MessageChooser be pluggable?
+ 4. Is the MessageChooser's interface correct?
+ 
+ 
+ 
+ In legacy (non-open sourced) versions of Samza, there was a configuration 
called "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes 
the Task will read from a stream. In Samza 0.7.0, the version that is in Apache 
Samza's master git repository right now, this throttling feature has been 
removed, and we've introduced the concept of a MessageChooser instead.  Our 
justification for removing this style of throttling in 0.7.0 is that usually 
what folks really want is:
+ 
+  1. Per-stream prioritization (always take messages from stream X before 
messages from stream Y if both have messages available).
+  2. Per-process isolation (I want to know that my process gets at least X % 
of the CPU, disk, network, etc).
+  3. A way to intentionally ignore a stream even if messages are available. 
This can be used to prevent a Samza job from hammering away on a remote 
database or web service. Samza jobs can process messages much faster than a 
remote DB or service might be able to process RPC requests. When a Samza job is 
triggering an RPC call to a remote web service to join data to events, for 
instance, developers want a way to limit processing to prevent overwhelming the 
web service. By intentionally ignoring messages from a stream, you can slow 
down the rate at which the StreamTask gets messages, and thereby slow down the 
rate at which RPC calls are made.
+ 
+ It's not actually the case that people want to voluntarily go slower than 
they can (which is what the old throttler was doing).
+ 
+ For (2), we are going to be relying on CGroups.
+ 
+ For (3), we're adding a new feature described in the [[Throttling]] design 
proposal and [[https://issues.apache.org/jira/browse/SAMZA-24|SAMZA-24]].
+ 
+ This design proposal describes how we will allow developers to define which 
message should be processed next when multiple stream/partitions have 
outstanding messages available.
+ 
+ === Use Case ===
+ 
+ An example use case is a Samza task that will be consuming multiple streams 
where some streams may be from live systems that have stricter SLA 
(low-latency) requirements and must always be prioritized over other streams 
that may be from batch systems. The legacy throttling approach that we were 
using before (giving the batch system a max of 1mb/s) is not the ideal way to 
express this type of stream prioritization because configuring the "batch" 
streams with a low consumption rate will decrease the overall throughput of the 
system when there is no data in the "live" streams. Furthermore, we'll might 
want to throttle each "batch" stream based on external signals that can change 
over time. Because of the dynamic nature of these external signals, we would 
like to have a programmatic interface that can dynamically change the 
prioritization as the signal changes.
+ 
+ Our current 0.7.0 MessageChooser implementation just round robins between 
streams. This has the effect of throttling the "batch" streams you're talking 
about, when messages are available from the "live" streams. When messages are 
not available from the live stream, the chooser will just choose the batch 
messages every time, thereby causing the task to process messages from the 
batch stream as fast as possible.
+ 
+ One caveat here is that the MessageChooser is currently not pluggable. We've 
hard coded this behavior into a "DefaultChooser" for now. The motivation for 
keeping it from being pluggable is that we're not entirely certain of the 
interface yet, and we're also not sure how well it will integrate with our 
fault tolerance and state management semantics (what happens if a pluggable 
MessageChooser is maintaining state, and there's a failure?).
+ 
+ This leads to the question: is the round robin approach the best approach for 
a generic message chooser (given that it's not pluggable at this time)? I think 
the answer is that it's not. A better approach would simply be a priority list 
that says, "Always process stream X, then stream Y, then stream Z." This would 
actually be better for your use case, since you could ALWAYS process "live" 
messages first, not just round-robin between live and batch streams.
+ 
+ Another alternative would be, "always process the stream that is farthest 
behind," for some definition of "farthest behind". Some definitions include:
+ 
+ 1) Farthest behind in wall-clock time (now() - timestamp in message).
+ 2) Farthest number of messages behind high watermark (when reading from 
Kafka).
+ 
+ What's your take on this new 0.7.0 feature? I think the existing 
MessageChooser would accomodate your needs, as is, but could be made even 
better with a priority list.
+ 
+ Questions:
+ 
+ 1. Should the MessageChooser be pluggable?
+ 2. Is the MessageChooser interface correct?
+ 3. What should the default implementation do?
+ 
+ == Potential Solutions ==
+ 
+ == Proposal ==
+ 
+ == Open Questions ==
+ 
+ 
+ 
+ 
+ I believe that this implementation guarantees the following:
+ 
+ 1. The chooser will have a message from all streams that have not yet hit 
head when choose is called.
+ 2. The main event loop will not be blocked while we are waiting for messages 
from streams that have yet to hit head.
+ 
+ The first guarantee (1) is important because it means that a MessageChooser 
can bootstrap a stream from 0 to head simply by always choosing the envelope 
for the stream if its available. Consider a case where you want to read the 
entire currencyCode stream, which has mappings of country code (e.g. USD) to 
symbols (e.g. $), and is partitioned by country code. You might want to join 
these symbols to a stream called transactions which is also partitioned by 
currency, and has a schema like {"country": "USD", "amount": 1234}. You could 
then have your StreamTask join the currency symbol to each transaction message, 
and emit messages like {"amount": "$1234"}.
+ 
+ To bootstrap the currencyCode stream, you need to read it from offset 0 all 
the way to the last message in the stream (what I'm calling head). It is not 
desirable to read any message from the transactions stream until the 
currencyCode stream has been fully read, or else you might try to join a 
transaction message to a country code that hasn't yet been read. Using the 
strategy above, SamzaContainer would get the offset of the last message in the 
stream at the time it starts; let's say the offset is 240. If the currencyCode 
stream is configured with samza.reset.offset, then SamzaContainer will 
initialize the TaskInstance to start reading from offset 0. A MessageChooser 
could then be implemented that simply always chooses envelopes from 
currencyCode if they're available, otherwise it picks envelopes from the 
transaction stream. Since the chooser is (1) guaranteed to always have a 
message available from currencyCode when choose is called if offset 240 has not 
yet been reached, this means the chooser will choose currencyCode messages 
0-240 before it chooses any transaction envelopes. This results in a bootstrap 
behavior where currencyCode is read in its entirety for a given partition 
before any transaction messages are processed for the partition.
+ 
+ The second guarantee (2) is important because it means that the window and 
commit methods will still be called at their expected intervals while the 
SystemConsumers class is waiting for messages from streamsYetToHitHead. For 
example, consider the case where it the current currencyCode offset is 237 
(less than the last offset of 240), and the stream becomes unavailable for 10 
minutes. If the MessageChooser has already chosen all envelopes from 
currencyCode, then we can't call choose on it, since it might accidentally 
choose a transaction message. Instead, we have to wait for the currencyCode 
stream to become available again, so we can read the next message and call 
MessageChooser.update with it. Rather than blocking with 
systemConsumer.poll(.., -1), we can instead poll the stream with a non-blocking 
timeout (i.e. 0), and then simply decide not to call the MessageChooser.choose 
method. In such a case, process will return null, which denotes that no new 
messages are available to process, and the window, send, and commit methods get 
called as usual. Then, the process loop repeats, and the currencyCode stream is 
polled again, to see if new messages are available.
+ 
+ 1. Add getLastOffset(String streamName, Partition partition): String (*)
+ 2. Modify SamzaContainer to get SystemAdmin for input stream's system.
+ 3. Modify SamzaContainer to use SystemAdmins.getLastOffset to construct a 
Map[SystemStreamPartition, String], where value is last offset.
+ 4. Modify SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, 
String] parameter in the constructor.
+ 5. Modify SystemConsumers to have a streamsYetToHitHead: 
Set[SystemStreamPartition] variable that is initialized to contain all input 
system stream partitions.
+ 6. Modify SystemConsumers so that chooser.choose is called only if 
streamsYetToHitHead.length == 0 or the chooser has been given a message from 
every stream that has yet to hit head.
+ 7. Modify SystemConsumers to remove chooser.choose.getSystemStreamPartition 
from streamsYetToHitHead whenever 
lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset)
+ 8. Modify SystemConsumers.register to remove SystemStreamPartition from 
streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition))
+ 8. Add a getLastOffset(String streamName, Partition partition): String method 
to KafkaSystemAdmin
+ 
+ * Note, we might want to batch these offset requests to 
getLastOffset(Set[SystemStreamPartition] systemStreamPartition): 
Map[SystemStreamPartition, String]
+ 
+ 
+ 
+ 
+ DefaultChooser
+   task.chooser.priorities.kafka.currencyCodes=1
+   task.chooser.priorities.kafka.transactions=0
+   task.chooser.priorities.<system-name>.<stream-name>=<priority>
+ 
+ problem: how to choose between two streams that are both not in caught up, 
and have the same priority?
+   offset (does lexicographical sort on offset string mean anything for this?)
+   number of messages behind head
+   percent behind
+   wall-clock time behind
+ 

Reply via email to