Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Samza Wiki" for change 
notification.

The "Pluggable MessageChooser" page has been changed by ChrisRiccomini:
https://wiki.apache.org/samza/Pluggable%20MessageChooser?action=diff&rev1=4&rev2=5

- = Pluggable !MessageChooser =
+ = Pluggable MessageChooser =
  
  == Introduction ==
  
- When a Samza job is reading messages from more than one 
system/stream/partition (SSP), a decision needs to be made about the order in 
which the messages from these SSPs are processed.  For example, if a message is 
available for (stream: AdViewEvent, partition: 7), and so is a message for 
(stream: AdClickEvent, partition: 7), which message should the SamzaContainer 
process next?
+ When a Samza job is reading messages from more than one 
system/stream/partition (SSP), a decision needs to be made about the order in 
which the messages from these SSPs are processed.  For example, if a message is 
available for (stream: !AdViewEvent, partition: 7), and so is a message for 
(stream: !AdClickEvent, partition: 7), which message should the !SamzaContainer 
process next?
  
  It turns out, the order to process messages can be determined based on any 
number of strategies:
  
@@ -31, +31 @@

  
  Some Samza jobs wish to fully consume a stream from offset 0 all the way 
through to the last message in the stream before they process messages from any 
other stream. This is useful for streams that have some key-value data that a 
Samza job wishes to use when processing messages from another stream.
  
- Consider a case where you want to read a currencyCode stream, which has 
mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by 
country code. You might want to join these symbols to a stream called 
transactions which is also partitioned by currency, and has a schema like 
{"country": "USD", "amount": 1234}. You could then have your StreamTask join 
the currency symbol to each transaction message, and emit messages like 
{"amount": "$1234"}.
+ Consider a case where you want to read a currencyCode stream, which has 
mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by 
country code. You might want to join these symbols to a stream called 
transactions which is also partitioned by currency, and has a schema like 
{"country": "USD", "amount": 1234}. You could then have your !StreamTask join 
the currency symbol to each transaction message, and emit messages like 
{"amount": "$1234"}.
  
  To bootstrap the currencyCode stream, you need to read it from offset 0 all 
the way to the last message in the stream (what I'm calling head). It is not 
desirable to read any message from the transactions stream until the 
currencyCode stream has been fully read, or else you might try to join a 
transaction message to a country code that hasn't yet been read. 
  
  ==== Time-aligned streams ====
  
- Some Samza jobs wish to keep their input streams as time-aligned as possible. 
This allows lets StreamTasks to have a smaller buffer when trying to joins or 
buffered sorts.
+ Some Samza jobs wish to keep their input streams as time-aligned as possible. 
This allows lets !StreamTasks to have a smaller buffer when trying to joins or 
buffered sorts.
  
- Imagine that a Samza job is reading from two streams: AdViews and AdClicks. 
It's joining AdViews and AdClicks to produce a click-through stream that 
defines the % of views that had a click. This Samza job would buffer all 
AdViews for some period of time (say 5 minutes). As AdClicks come in, the job 
would join the AdView with the AdClick, and emit a message like (ad-id: 1234, 
click: true). Any AdViews that are in the window for more than 5 minutes are 
assumed to have no click, and messages are sent with (ad-id: 2345, click: 
false).
+ Imagine that a Samza job is reading from two streams: !AdViews and !AdClicks. 
It's joining !AdViews and !AdClicks to produce a click-through stream that 
defines the % of views that had a click. This Samza job would buffer all 
!AdViews for some period of time (say 5 minutes). As !AdClicks come in, the job 
would join the !AdView with the !AdClick, and emit a message like (ad-id: 1234, 
click: true). Any !AdViews that are in the window for more than 5 minutes are 
assumed to have no click, and messages are sent with (ad-id: 2345, click: 
false).
  
- The problem that this kind of job runs into is that it wants to keep the 
AdView events and AdClick events as closely aligned (based on the time the 
messages were sent) as possible, because it allows the job to shrink the 
buffering window, which frees up memory and disk space. If you can keep AdView 
and AdClick messages within 2 minutes of each other, you only need a 2 minute 
buffer. It doesn't make any sense to process and buffer an AdView event from 
two seconds ago if an AdClick from 1 minute ago can be processed and its AdView 
can be removed from the buffer (thereby freeing up space).
+ The problem that this kind of job runs into is that it wants to keep the 
!AdView events and !AdClick events as closely aligned (based on the time the 
messages were sent) as possible, because it allows the job to shrink the 
buffering window, which frees up memory and disk space. If you can keep !AdView 
and !AdClick messages within 2 minutes of each other, you only need a 2 minute 
buffer. It doesn't make any sense to process and buffer an !AdView event from 
two seconds ago if an !AdClick from 1 minute ago can be processed and its 
!AdView can be removed from the buffer (thereby freeing up space).
  
- === !MessageChooser ===
+ === MessageChooser ===
  
  In Samza 0.7.0 we've introduced a class called !MessageChooser. This is an 
object that is given up to one message at a time from each input 
stream/partition pair (when they're available). The chooser's job is to return 
the incoming message envelope that should be processed next.
  
- In plain-spoken English, Samza says, "I have IncomingMessageEnvelopes from 
stream/partition X, and stream/partitionY, which should I process next?" It's 
the !MessageChooser's job to answer this question.
+ In plain-spoken English, Samza says, "I have !IncomingMessageEnvelopes from 
stream/partition X, and stream/partitionY, which should I process next?" It's 
the !MessageChooser's job to answer this question.
  
  The !MessageChooser's interface currently looks like this:
  
  {{{
- interface !MessageChooser {
+ interface MessageChooser {
    /**
     * Notify the chooser that a new envelope is available for a processing. A
-    * !MessageChooser will receive, at most, one outstanding envelope per
+    * MessageChooser will receive, at most, one outstanding envelope per
     * system/stream/partition combination. For example, if update is called for
     * partition 7 of kafka.mystream, then update will not be called with an
     * envelope from partition 7 of kafka.mystream until the previous envelope 
has
@@ -65, +65 @@

     * @param envelope
     *          An unprocessed envelope.
     */
-   void update(IncomingMessageEnvelope envelope);
+   void update(!IncomingMessageEnvelope envelope);
  
    /**
     * The choose method is invoked when the SamzaContainer is ready to process 
a
@@ -76, +76 @@

     * @return The next envelope to process, or null if the chooser has no
     *         messages or doesn't want to process any at the moment.
     */
-   IncomingMessageEnvelope choose();
+   !IncomingMessageEnvelope choose();
  }
  }}}
  
@@ -84, +84 @@

  
   1. !SystemConsumers buffers messages from all SSPs as they become available.
   2. If !MessageChooser has no messages for a given SSP, and !SystemConsumers 
has a message in its buffer for the SSP, the !MessageChooser will be updated 
once with the next message in the buffer.
-  3. When SamzaContainer is ready to process another message, it calls 
!SystemConsumers.choose, which in-turn calls !MessageChooser.choose.
+  3. When !SamzaContainer is ready to process another message, it calls 
!SystemConsumers.choose, which in-turn calls !MessageChooser.choose.
  
  The contract between the !MessageChooser and the !SystemConsumers class is:
  
@@ -92, +92 @@

   * A null return from !MessageChooser.choose means no envelopes should be 
processed at the moment.
   * A !MessageChooser may elect to return null when choose is called, even if 
unprocessed messages have been given by the update method.
   * A !MessageChooser will not have any of its in-memory state restored in the 
event of a failure.
-  * Blocking operations (such as Thread.sleep) will block all processing in 
the entire SamzaContainer.
+  * Blocking operations (such as Thread.sleep) will block all processing in 
the entire !SamzaContainer.
   * A !MessageChooser should never return the same envelope more than once.
   * Non-deterministic (e.g. time-based) !MessageChoosers are allowed.
   * A !MessageChooser does not need to be thread-safe.
@@ -116, +116 @@

  
  Such an implementation would allow us to support the QoS, bootstrapping, and 
time-alignment use cases.
  
- == Half-baked proposal ==
+ == Proposal ==
  
  First, some Axioms that I believe to be true:
  
   1. We can't support prioritization based on message content without the 
!MessageChooser being pluggable.
-  2. We can't support time-aligned prioritization without either a pluggable 
!MessageChooser (that can introspect a message and extract the timestamp), a 
timestamp field in IncomingMessageEnvelope, or some equivalent way to get 
message stream-time.
+  2. We can't support time-aligned prioritization without either a pluggable 
!MessageChooser (that can introspect a message and extract the timestamp), a 
timestamp field in !IncomingMessageEnvelope, or some equivalent way to get 
message stream-time.
   3. The !MessageChooser must know if a stream is "caught up" or still being 
bootstrapped when making a decision about which message to process next.
   4. Once a stream is "caught up" (envelope.getOffset == 
offsetOfLastMessageInStream at least once), we can assume the stream has been 
fully "bootstrapped".
  
@@ -133, +133 @@

  
  The changes required to do this are:
  
- 1. Add getLastOffset(String streamName, Partition partition): String (*) to 
SystemAdmin interface.
+  1. Add getLastOffset(String streamName, Partition partition): String (*) to 
!SystemAdmin interface.
- 2. Modify SamzaContainer to get SystemAdmin for input stream's system.
+  2. Modify !SamzaContainer to get !SystemAdmin for input stream's system.
- 3. Modify SamzaContainer to use SystemAdmins.getLastOffset to construct a 
Map[SystemStreamPartition, String], where value is last offset.
+  3. Modify !SamzaContainer to use !SystemAdmins.getLastOffset to construct a 
Map[!SystemStreamPartition, String], where value is last offset.
- 4. Modify !SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, 
String] parameter in the constructor.
+  4. Modify !SystemConsumers to take a lastOffsets: 
Map[!SystemStreamPartition, String] parameter in the constructor.
- 5. Modify !SystemConsumers to have a streamsYetToHitHead: 
Set[SystemStreamPartition] variable that is initialized to contain all input 
system stream partitions.
+  5. Modify !SystemConsumers to have a streamsYetToHitHead: 
Set[!SystemStreamPartition] variable that is initialized to contain all input 
system stream partitions.
- 6. Modify !SystemConsumers so that chooser.choose is called only if 
streamsYetToHitHead.length == 0 or the chooser has been given a message from 
every stream that has yet to hit head.
+  6. Modify !SystemConsumers so that chooser.choose is called only if 
streamsYetToHitHead.length == 0 or the chooser has been given a message from 
every stream that has yet to hit head.
- 7. Modify !SystemConsumers to remove chooser.choose.getSystemStreamPartition 
from streamsYetToHitHead whenever 
lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset)
+  7. Modify !SystemConsumers to remove chooser.choose.getSystemStreamPartition 
from streamsYetToHitHead whenever 
lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset)
- 8. Modify !SystemConsumers.register to remove SystemStreamPartition from 
streamsYetToHitHead if lastReadOffset.equals(lastOffsets(systemStreamPartition))
+  8. Modify !SystemConsumers.register to remove !SystemStreamPartition from 
streamsYetToHitHead if 
lastReadOffset.equals(lastOffsets(!SystemStreamPartition))
- 8. Add a getLastOffset(String streamName, Partition partition): String method 
to KafkaSystemAdmin
+  9. Add a getLastOffset(String streamName, Partition partition): String 
method to !KafkaSystemAdmin
  
- * Note, we might want to batch these offset requests to 
getLastOffset(Set[SystemStreamPartition] systemStreamPartition): 
Map[SystemStreamPartition, String]
+ * Note, we might want to batch these offset requests to 
getLastOffset(Set[!SystemStreamPartition] !SystemStreamPartition): 
Map[!SystemStreamPartition, String]
  
  === QoS ===
  
@@ -189, +189 @@

  
  In cases where two streams have the same priority, we need to implement a 
strategy (either time-aligned, round robin, or some proxy for time-aligned). I 
haven't come up with an opinion on which strategy we should use yet.
  
- === !MessageChooser interface ===
+ === MessageChooser interface ===
  
  I think the !MessageChooser interface is fine as it is. Initially, I wanted 
to add register/start/stop methods to it.
  
- The main motivation for start/stop is allow developers to setup a client that 
queries some outside service to make picking decisions. I'm not saying that 
this is advisable, but I know people will try and do it. Without stop, there's 
no way to shut down the client when the service stops. If we assume the service 
never stops, then this isn't a problem, but if there is a definite "end" to the 
processor (i.e. TaskCoordinator.shutdown), then the chooser needs a graceful 
shutdown.
+ The main motivation for start/stop is that it allows developers to setup a 
client that queries some outside service to make picking decisions. I'm not 
saying that this is advisable, but I know people will try and do it. Without 
stop, there's no way to shut down the client when the service stops. If we 
assume the service never stops, then this isn't a problem, but if there is a 
definite "end" to the processor (i.e. !TaskCoordinator.shutdown), then the 
chooser needs a graceful shutdown.
  
- The motivation for register is that there are situations where you want to 
initialize your data structures (or whatever) on startup before any messages 
are received. Letting the !MessageChooser know which SystemStreamPartitions 
it's going to be receiving messages from just seems like a good idea..
+ The motivation for register is that there are situations where you want to 
initialize your data structures (or whatever) on startup before any messages 
are received. Letting the !MessageChooser know which !SystemStreamPartitions 
it's going to be receiving messages from just seems like a good idea..
  
  I have since backed off on this idea since I can't come up with a good 
concrete example of why we need them, and all of the reference implementations 
we've written so far wouldn't need them.
  
+ == Open questions ==
+ 
+ A (probably partial) list of open questions:
+ 
+  1. How should we prioritize streams that are the same priority in the 
!DefaultChooser?
+  2. Should we add a concept of time to Samza?
+  3. What is the behavior of aligning streams by messages behind high 
watermark, or percent behind high watermark?
+ 

Reply via email to