Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Samza Wiki" for change 
notification.

The "Pluggable MessageChooser" page has been changed by ChrisRiccomini:
https://wiki.apache.org/samza/Pluggable%20MessageChooser?action=diff&rev1=3&rev2=4

- = Pluggable MessageChooser =
+ = Pluggable !MessageChooser =
  
  == Introduction ==
  
  When a Samza job is reading messages from more than one 
system/stream/partition (SSP), a decision needs to be made about the order in 
which the messages from these SSPs are processed.  For example, if a message is 
available for (stream: AdViewEvent, partition: 7), and so is a message for 
(stream: AdClickEvent, partition: 7), which message should the SamzaContainer 
process next?
  
- It turns out, the order in which messages are processed by SamzaContainer 
impacts a number of use cases.
+ It turns out, the order to process messages can be determined based on any 
number of strategies:
+ 
+  * Pick which message to read based on its contents (e.g. stream time).
+  * Backlog (pick whichever stream is farther behind the head of the stream).
+  * Initialization (always pick stream X until it's caught up to the head of 
the stream).
+  * Quality of service (if I've picked too many messages from topic X, don't 
pick from it for a while, so we don't starve other topics).
+  * Round robin (just pick in a circular buffer)
+  * Batching (pick 100 messages from stream X, then 100 messages from stream 
Y, etc).
  
  === Use cases ===
  
@@ -36, +43 @@

  
  The problem that this kind of job runs into is that it wants to keep the 
AdView events and AdClick events as closely aligned (based on the time the 
messages were sent) as possible, because it allows the job to shrink the 
buffering window, which frees up memory and disk space. If you can keep AdView 
and AdClick messages within 2 minutes of each other, you only need a 2 minute 
buffer. It doesn't make any sense to process and buffer an AdView event from 
two seconds ago if an AdClick from 1 minute ago can be processed and its AdView 
can be removed from the buffer (thereby freeing up space).
  
- === MessageChooser ===
+ === !MessageChooser ===
  
- In Samza 0.7.0 we've introduced a class called MessageChooser. This is an 
object that is given up to one message at a time from each input 
stream/partition pair (when they're available). The chooser's job is to return 
the incoming message envelope that should be processed next.
+ In Samza 0.7.0 we've introduced a class called !MessageChooser. This is an 
object that is given up to one message at a time from each input 
stream/partition pair (when they're available). The chooser's job is to return 
the incoming message envelope that should be processed next.
  
- In plain-spoken English, the Samza says, "I have IncomingMessageEnvelopes 
from stream/partition X, and stream/partitionY, which should I process next?" 
It's the MessageChooser's job to answer this question.
+ In plain-spoken English, Samza says, "I have IncomingMessageEnvelopes from 
stream/partition X, and stream/partitionY, which should I process next?" It's 
the !MessageChooser's job to answer this question.
  
- The MessageChooser's interface currently looks like this:
+ The !MessageChooser's interface currently looks like this:
  
  {{{
- interface MessageChooser {
+ interface !MessageChooser {
    /**
     * Notify the chooser that a new envelope is available for a processing. A
-    * MessageChooser will receive, at most, one outstanding envelope per
+    * !MessageChooser will receive, at most, one outstanding envelope per
     * system/stream/partition combination. For example, if update is called for
     * partition 7 of kafka.mystream, then update will not be called with an
     * envelope from partition 7 of kafka.mystream until the previous envelope 
has
@@ -73, +80 @@

  }
  }}}
  
- The manner in which MessageChooser is used is:
+ The manner in which !MessageChooser is used is:
  
-  1. SystemConsumers buffers messages from all SSPs as they become available.
+  1. !SystemConsumers buffers messages from all SSPs as they become available.
-  2. If MessageChooser has no messages for a given SSP, and SystemConsumers 
has a message in its buffer for the SSP, the MessageChooser will be updated 
once with the next message in the buffer.
+  2. If !MessageChooser has no messages for a given SSP, and !SystemConsumers 
has a message in its buffer for the SSP, the !MessageChooser will be updated 
once with the next message in the buffer.
-  3. When SamzaContainer is ready to process another message, it calls 
SystemConsumers.choose, which in-turn calls MessageChooser.choose.
+  3. When SamzaContainer is ready to process another message, it calls 
!SystemConsumers.choose, which in-turn calls !MessageChooser.choose.
  
- The contract between the MessageChooser and the SystemConsumers class is:
+ The contract between the !MessageChooser and the !SystemConsumers class is:
  
   * Update can be called multiple times before choose is called.
-  * A null return from MessageChooser.choose means no envelopes should be 
processed at the moment.
+  * A null return from !MessageChooser.choose means no envelopes should be 
processed at the moment.
-  * A MessageChooser may elect to return null when choose is called, even if 
unprocessed messages have been given by the update method.
+  * A !MessageChooser may elect to return null when choose is called, even if 
unprocessed messages have been given by the update method.
-  * A MessageChooser will not have any of its in-memory state restored in the 
event of a failure.
+  * A !MessageChooser will not have any of its in-memory state restored in the 
event of a failure.
   * Blocking operations (such as Thread.sleep) will block all processing in 
the entire SamzaContainer.
-  * A MessageChooser should never return the same envelope more than once.
+  * A !MessageChooser should never return the same envelope more than once.
-  * Non-deterministic (e.g. time-based) MessageChoosers are allowed.
+  * Non-deterministic (e.g. time-based) !MessageChoosers are allowed.
-  * A MessageChooser does not need to be thread-safe.
+  * A !MessageChooser does not need to be thread-safe.
  
  == Problem ==
  
- The MessageChooser is currently hard-wired to a default MessageChooser 
implementation, which uses a round-robin strategy to pick messages from all 
streams with outstanding messages. This implementation is insufficient for a 
number of use cases.
+ The !MessageChooser is currently hard-wired to a default !MessageChooser 
implementation, which uses a round-robin strategy to pick messages from all 
streams with outstanding messages. This implementation is insufficient.
  
- === Problems ===
+ Problems to think about:
  
- Questions worth asking are:
+  1. Do we accept that the use cases and strategies listed above are 
legitimate?
+  2. Is the !MessageChooser the appropriate way to support the use cases 
outlined above?
+  3. Is the !MessageChooser's interface correct to support the use cases 
outlined above?
+  4. Can we implement a single default !MessageChooser that solves all of the 
use cases listed above?
+  5. Should the !MessageChooser be pluggable?
  
+ The general feedback I've gotten is that question 4 is most important. I 
pretty much agree. The default implementation should:
- 1. Is the MessageChooser the appropriate way to support the use cases 
outlined above?
- 2. Can we implement a single default MessageChooser that solves all of the 
use cases listed above?
- 3. Should the MessageChooser be pluggable?
- 4. Is the MessageChooser's interface correct?
  
+  1. Allow preferring certain streams for the state bootstrap.
+  2. Keep all streams of the same priority roughly in sync.
  
+ Such an implementation would allow us to support the QoS, bootstrapping, and 
time-alignment use cases.
  
- In legacy (non-open sourced) versions of Samza, there was a configuration 
called "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes 
the Task will read from a stream. In Samza 0.7.0, the version that is in Apache 
Samza's master git repository right now, this throttling feature has been 
removed, and we've introduced the concept of a MessageChooser instead.  Our 
justification for removing this style of throttling in 0.7.0 is that usually 
what folks really want is:
+ == Half-baked proposal ==
  
+ First, some Axioms that I believe to be true:
-  1. Per-stream prioritization (always take messages from stream X before 
messages from stream Y if both have messages available).
-  2. Per-process isolation (I want to know that my process gets at least X % 
of the CPU, disk, network, etc).
-  3. A way to intentionally ignore a stream even if messages are available. 
This can be used to prevent a Samza job from hammering away on a remote 
database or web service. Samza jobs can process messages much faster than a 
remote DB or service might be able to process RPC requests. When a Samza job is 
triggering an RPC call to a remote web service to join data to events, for 
instance, developers want a way to limit processing to prevent overwhelming the 
web service. By intentionally ignoring messages from a stream, you can slow 
down the rate at which the StreamTask gets messages, and thereby slow down the 
rate at which RPC calls are made.
  
- It's not actually the case that people want to voluntarily go slower than 
they can (which is what the old throttler was doing).
+  1. We can't support prioritization based on message content without the 
!MessageChooser being pluggable.
+  2. We can't support time-aligned prioritization without either a pluggable 
!MessageChooser (that can introspect a message and extract the timestamp), a 
timestamp field in IncomingMessageEnvelope, or some equivalent way to get 
message stream-time.
+  3. The !MessageChooser must know if a stream is "caught up" or still being 
bootstrapped when making a decision about which message to process next.
+  4. Once a stream is "caught up" (envelope.getOffset == 
offsetOfLastMessageInStream at least once), we can assume the stream has been 
fully "bootstrapped".
  
- For (2), we are going to be relying on CGroups.
+ I believe this set of rules implies that we have to have a pluggable 
!MessageChooser, unless we decide to add some concept of time to Samza.
  
- For (3), we're adding a new feature described in the [[Throttling]] design 
proposal and [[https://issues.apache.org/jira/browse/SAMZA-24|SAMZA-24]].
+ === Bootstrapping ===
  
- This design proposal describes how we will allow developers to define which 
message should be processed next when multiple stream/partitions have 
outstanding messages available.
+ To support bootstrapping, I propose that we change the !SystemConsumers 
behavior slightly. Right now, !SystemConsumers alternates between calling 
update and choose methods. We should change !SystemConsumers to only call 
choose if the !MessageChooser has been updated with envelopes from all SSPs 
that are not yet at head. If we make this change, then a simple hard-coded 
priority chooser will allow us to bootstrap simply by prioritizing the 
bootstrap stream as higher priority than anything else (e.g. currencyCodes=1, 
transactions=0). If you do this, the chooser will always choose the bootstrap 
stream, and the !SystemConsumers will guarantee that the chooser will always 
have a message from the bootstrap stream until it's at head.
  
- === Use Case ===
+ The changes required to do this are:
  
- An example use case is a Samza task that will be consuming multiple streams 
where some streams may be from live systems that have stricter SLA 
(low-latency) requirements and must always be prioritized over other streams 
that may be from batch systems. The legacy throttling approach that we were 
using before (giving the batch system a max of 1mb/s) is not the ideal way to 
express this type of stream prioritization because configuring the "batch" 
streams with a low consumption rate will decrease the overall throughput of the 
system when there is no data in the "live" streams. Furthermore, we'll might 
want to throttle each "batch" stream based on external signals that can change 
over time. Because of the dynamic nature of these external signals, we would 
like to have a programmatic interface that can dynamically change the 
prioritization as the signal changes.
- 
- Our current 0.7.0 MessageChooser implementation just round robins between 
streams. This has the effect of throttling the "batch" streams you're talking 
about, when messages are available from the "live" streams. When messages are 
not available from the live stream, the chooser will just choose the batch 
messages every time, thereby causing the task to process messages from the 
batch stream as fast as possible.
- 
- One caveat here is that the MessageChooser is currently not pluggable. We've 
hard coded this behavior into a "DefaultChooser" for now. The motivation for 
keeping it from being pluggable is that we're not entirely certain of the 
interface yet, and we're also not sure how well it will integrate with our 
fault tolerance and state management semantics (what happens if a pluggable 
MessageChooser is maintaining state, and there's a failure?).
- 
- This leads to the question: is the round robin approach the best approach for 
a generic message chooser (given that it's not pluggable at this time)? I think 
the answer is that it's not. A better approach would simply be a priority list 
that says, "Always process stream X, then stream Y, then stream Z." This would 
actually be better for your use case, since you could ALWAYS process "live" 
messages first, not just round-robin between live and batch streams.
- 
- Another alternative would be, "always process the stream that is farthest 
behind," for some definition of "farthest behind". Some definitions include:
- 
- 1) Farthest behind in wall-clock time (now() - timestamp in message).
- 2) Farthest number of messages behind high watermark (when reading from 
Kafka).
- 
- What's your take on this new 0.7.0 feature? I think the existing 
MessageChooser would accomodate your needs, as is, but could be made even 
better with a priority list.
- 
- Questions:
- 
- 1. Should the MessageChooser be pluggable?
- 2. Is the MessageChooser interface correct?
- 3. What should the default implementation do?
- 
- == Potential Solutions ==
- 
- == Proposal ==
- 
- == Open Questions ==
- 
- 
- 
- 
- I believe that this implementation guarantees the following:
- 
- 1. The chooser will have a message from all streams that have not yet hit 
head when choose is called.
- 2. The main event loop will not be blocked while we are waiting for messages 
from streams that have yet to hit head.
- 
- The first guarantee (1) is important because it means that a MessageChooser 
can bootstrap a stream from 0 to head simply by always choosing the envelope 
for the stream if its available. Consider a case where you want to read the 
entire currencyCode stream, which has mappings of country code (e.g. USD) to 
symbols (e.g. $), and is partitioned by country code. You might want to join 
these symbols to a stream called transactions which is also partitioned by 
currency, and has a schema like {"country": "USD", "amount": 1234}. You could 
then have your StreamTask join the currency symbol to each transaction message, 
and emit messages like {"amount": "$1234"}.
- 
- To bootstrap the currencyCode stream, you need to read it from offset 0 all 
the way to the last message in the stream (what I'm calling head). It is not 
desirable to read any message from the transactions stream until the 
currencyCode stream has been fully read, or else you might try to join a 
transaction message to a country code that hasn't yet been read. Using the 
strategy above, SamzaContainer would get the offset of the last message in the 
stream at the time it starts; let's say the offset is 240. If the currencyCode 
stream is configured with samza.reset.offset, then SamzaContainer will 
initialize the TaskInstance to start reading from offset 0. A MessageChooser 
could then be implemented that simply always chooses envelopes from 
currencyCode if they're available, otherwise it picks envelopes from the 
transaction stream. Since the chooser is (1) guaranteed to always have a 
message available from currencyCode when choose is called if offset 240 has not 
yet been reached, this means the chooser will choose currencyCode messages 
0-240 before it chooses any transaction envelopes. This results in a bootstrap 
behavior where currencyCode is read in its entirety for a given partition 
before any transaction messages are processed for the partition.
- 
- The second guarantee (2) is important because it means that the window and 
commit methods will still be called at their expected intervals while the 
SystemConsumers class is waiting for messages from streamsYetToHitHead. For 
example, consider the case where it the current currencyCode offset is 237 
(less than the last offset of 240), and the stream becomes unavailable for 10 
minutes. If the MessageChooser has already chosen all envelopes from 
currencyCode, then we can't call choose on it, since it might accidentally 
choose a transaction message. Instead, we have to wait for the currencyCode 
stream to become available again, so we can read the next message and call 
MessageChooser.update with it. Rather than blocking with 
systemConsumer.poll(.., -1), we can instead poll the stream with a non-blocking 
timeout (i.e. 0), and then simply decide not to call the MessageChooser.choose 
method. In such a case, process will return null, which denotes that no new 
messages are available to process, and the window, send, and commit methods get 
called as usual. Then, the process loop repeats, and the currencyCode stream is 
polled again, to see if new messages are available.
- 
- 1. Add getLastOffset(String streamName, Partition partition): String (*)
+ 1. Add getLastOffset(String streamName, Partition partition): String (*) to 
SystemAdmin interface.
  2. Modify SamzaContainer to get SystemAdmin for input stream's system.
  3. Modify SamzaContainer to use SystemAdmins.getLastOffset to construct a 
Map[SystemStreamPartition, String], where value is last offset.
- 4. Modify SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, 
String] parameter in the constructor.
+ 4. Modify !SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, 
String] parameter in the constructor.
- 5. Modify SystemConsumers to have a streamsYetToHitHead: 
Set[SystemStreamPartition] variable that is initialized to contain all input 
system stream partitions.
+ 5. Modify !SystemConsumers to have a streamsYetToHitHead: 
Set[SystemStreamPartition] variable that is initialized to contain all input 
system stream partitions.
- 6. Modify SystemConsumers so that chooser.choose is called only if 
streamsYetToHitHead.length == 0 or the chooser has been given a message from 
every stream that has yet to hit head.
+ 6. Modify !SystemConsumers so that chooser.choose is called only if 
streamsYetToHitHead.length == 0 or the chooser has been given a message from 
every stream that has yet to hit head.
- 7. Modify SystemConsumers to remove chooser.choose.getSystemStreamPartition 
from streamsYetToHitHead whenever 
lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset)
+ 7. Modify !SystemConsumers to remove chooser.choose.getSystemStreamPartition 
from streamsYetToHitHead whenever 
lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset)
- 8. Modify SystemConsumers.register to remove SystemStreamPartition from 
streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition))
+ 8. Modify !SystemConsumers.register to remove SystemStreamPartition from 
streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition))
  8. Add a getLastOffset(String streamName, Partition partition): String method 
to KafkaSystemAdmin
  
  * Note, we might want to batch these offset requests to 
getLastOffset(Set[SystemStreamPartition] systemStreamPartition): 
Map[SystemStreamPartition, String]
  
+ === QoS ===
  
+ This is actually the easiest use case. Again, I think a simple priority 
chooser will take care of this. We don't even need to change the 
!SystemConsumers behavior to support this use case. If we set the priority for 
real-time=1 and batch-stream=0, then real-time messages will always be 
processed before batch messages, provided that they're available. If we make 
the change to !SystemConsumers outlined in the bootstrapping section, this will 
also guarantee that, when a processor is stopped and started, the real-time 
stream will be fully caught up to "head" before any batch messages are 
processed.
  
+ === Time alignment ===
  
+ As mentioned in the axiom list above, I don't think we can support 
time-aligned streams without either adding a concept of time to Samza or 
allowing the !MessageChooser to introspect into the message. I am abandoning 
supporting this feature in the default message chooser. Instead, I think we 
should just support pluggable !MessageChoosers, and allow developers to 
implement their own time-aligned !MessageChoosers.
- DefaultChooser
-   task.chooser.priorities.kafka.currencyCodes=1
-   task.chooser.priorities.kafka.transactions=0
-   task.chooser.priorities.<system-name>.<stream-name>=<priority>
  
+ A question worth asking is: can we make the default chooser support a 
poor-man's version of this? Maybe we can't align directly by stream-time, but 
other proxies are:
- problem: how to choose between two streams that are both not in caught up, 
and have the same priority?
-   offset (does lexicographical sort on offset string mean anything for this?)
-   number of messages behind head
-   percent behind
-   wall-clock time behind
  
+  * Choose the message with the smallest offset.
+  * Choose the message from the stream with the most unprocessed messages.
+  * Choose the message from the stream that is the farthest % behind head.
+ 
+ ==== Offsets ====
+ 
+ Offset is the only one we could support with no API changes. The problems 
with supporting offset-aligned streams are:
+ 
+  1. Different stream systems might have different offset styles. How do you 
sort between two entirely different offset styles?
+  2. Offsets are represented as strings. Kafka offsets are longs. Sorting 
strings that contains longs using lexicographical sort leads to "2" > "100".
+ 
+ Additionally, the behavior of a chooser that chooses messages by lowest 
offset is somewhat unintuitive. If you have a Kafka stream X with current 
offset 1 billion, and a new Kafka stream Y with offset 0, prioritizing by 
offset effectively means that you are always going to take messages from stream 
Y until its offset number catches up to stream X, which might never happen if 
stream X gets more messages/sec than stream Y.
+ 
+ ==== Messages behind head ====
+ 
+ I need to think about this a bit more.
+ 
+ ==== Percent behind head ====
+ 
+ I need to think about this a bit more.
+ 
+ === DefaultChooser ===
+ 
+ I think the default chooser should be a simple hard-coded priority chooser, 
which lets you say, "If you have messages from stream X, always read them 
before messages from stream Y." This can be implemented with a config:
+ 
+ {{{
+ task.chooser.priorities.kafka.currencyCode=1
+ task.chooser.priorities.kafka.transactions=0
+ }}}
+ 
+ The convention is task.chooser.priorities.<system>.<stream>=<priority>, where 
higher priority streams get processed between lower priority streams.
+ 
+ In cases where two streams have the same priority, we need to implement a 
strategy (either time-aligned, round robin, or some proxy for time-aligned). I 
haven't come up with an opinion on which strategy we should use yet.
+ 
+ === !MessageChooser interface ===
+ 
+ I think the !MessageChooser interface is fine as it is. Initially, I wanted 
to add register/start/stop methods to it.
+ 
+ The main motivation for start/stop is allow developers to setup a client that 
queries some outside service to make picking decisions. I'm not saying that 
this is advisable, but I know people will try and do it. Without stop, there's 
no way to shut down the client when the service stops. If we assume the service 
never stops, then this isn't a problem, but if there is a definite "end" to the 
processor (i.e. TaskCoordinator.shutdown), then the chooser needs a graceful 
shutdown.
+ 
+ The motivation for register is that there are situations where you want to 
initialize your data structures (or whatever) on startup before any messages 
are received. Letting the !MessageChooser know which SystemStreamPartitions 
it's going to be receiving messages from just seems like a good idea..
+ 
+ I have since backed off on this idea since I can't come up with a good 
concrete example of why we need them, and all of the reference implementations 
we've written so far wouldn't need them.
+ 

Reply via email to