[ 
https://issues.apache.org/jira/browse/SAMZA-71?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13864592#comment-13864592
 ] 

Jakob Homan commented on SAMZA-71:
----------------------------------

Sidestepping the above discussion regarding co-grouping, it would be reasonably 
easy to support two more types of partitioning strategies that could be 
immediately useful: one topic-partition (TP) per task instance (TI) and one TI 
per Samza container.

Because of what's described above, we're severely limited in our ability to 
scale jobs in ways that are not functions of the underlying streams' partition 
count.  SAMZA-82 changed the above description of how partitioning is done.  
Now, the AM determines the topics and partitions that exist across the job and 
explicitly pass to the containers which TPs they are responsible for.  The 
containers then group those TPs by their partition and create a new TI for 
each.  By adding another directive from the AM to the container of how to group 
the TIs, we can handle these new strategies.

Much of the complexity above is dictated by the current format of the 
checkpoint log, which writes all of the TP offsets that a particular TI is 
responsible for as one map.  I'd like to change this so that each TP and offset 
is written to the checkpoint log by itself.  On startup, all of the containers 
will read through the entire log, discarding the checkpoints for TPs it is not 
handling.  This adds a bit of cost to the job startup, but the checkpoint log 
is not large (and its total size in bytes is not changing, just the number of 
entries in it).  Also, we can continue to partition the log by some reasonable 
number (initial partition count?) so that in the standard grouped-by-partition 
scheme, containers only have to scan the partitions for which they are 
responsible.  Finally, the key-dedupe feature we rely on for state will assist 
us here as well, pruning out old offsets.  The concern is that since the 
offsets are no longer stored as a group individual offsets may age out separate 
from their fellows, but this problem is no larger than the current one where 
whole groups could disappear.

Three partitioning strategies that appear immediately useful:
* GROUP_BY_PARTITION - Our current strategy. Each TI is defined by its 
partition and all TPs with that partition are funneled through it.
* ONE_TI_PER_TP - Each TP gets its own TI.  The highest level of fanout 
possible and the maximum amount of granularity we can have with a Samza job.  
The AM can then round robin the assignment of the TPs to the containers, 
getting an even distribution of TPs across the job.  This would be useful for 
consuming lots and lots of TPs that do not need to coordinate or group in any 
way.
* ONE_TI_FOR_ALL_TP - One TI for all the TPs (on a container).  By then setting 
the number of containers across the job to 1, all the messages would then be 
funneled through the single container.  This would be useful for jobs that want 
an all-encompassing view of the data flow, even if its partitioned (otherwise 
we'd need to go through a repartitioning step and incur the latency and data 
storage cost).

(Additionally, it would probably be useful to provide a pluggable interface for 
matching TIs to TPs.  For example, some may wish to group by topic name prefix 
(ie, an org uses "DATA-CENTER-x-TOPIC-NAME-FOO" as its Kafka topic naming 
convention and wants to group by data center via a topic name prefix).  This 
can be done at a later time.)

Once these strategy concepts exist, the AM can pass the strategy to the 
container as a parameter.  The container then can instantiate the correct 
number of TI, assiging the correct TPs as it goes.

The immediate tasks are to refactor the checkpoint log to be independent of 
partitioning, and to introduce the partitioning strategy to the AM and 
containers.

How does this sound?

> Support new partitioning strategies
> -----------------------------------
>
>                 Key: SAMZA-71
>                 URL: https://issues.apache.org/jira/browse/SAMZA-71
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>              Labels: project
>
> Currently, the number of stream tasks instances that are created for a Samza 
> job are equal to the max number of partitions across all input streams. For 
> example, if your Samza job is using two YARN containers, and has two input 
> streams (IS1 and IS2), and IS1 has 4 partitions, and IS2 has 8 partitions, 
> then the Samza job will have a total of max(4,8)=8 partitions. Therefore, 8 
> StreamTask instances would be created (spread as evenly as possible across 
> two YARN containers).
> This scheme works for co-grouping when both input streams have the same 
> number of partitions, and are partitioned using the same partitioning scheme 
> (e.g. a Kafka partitioner that partitions by member ID). The parallelism of 
> the job is limited by the number of StreamTask instances, which means that 
> the parallelism is limited by the max number of partitions across all input 
> streams (8 in the example above).
> We can actually do better than these guarantees. We should change Samza's 
> partitioning style to behave in the following way:
> 1. Support a task.partition.scheme=max config. Samza will create one stream 
> task instance for each input stream partition. In the example above, IS1 has 
> 4 partitions, and IS2 has 8 partitions, so Samza would create 4+8=12 
> StreamTasks. Each input stream's partition would be mapped to a unique 
> StreamTask that would receive messages ONLY from that input stream/partition 
> pair, and from no other. Using this style of partitioning increases a Samza 
> job's possible parallelism to be the absolute maximum (based on Kafka 
> semantics, which limit a single consumer for each input stream/partition 
> pair).
> 2. Support a task.partition.scheme=cogroup config. Samza will create one 
> stream task instance for the greatest common denominator of all stream task 
> partition counts. For example, in the example above, IS1 has 4 partitions, 
> and IS2 has 8. GCD(4,8)=4, so Samza would create four partitions. If IS1 had 
> 4 partitions, and IS2 had 6, then GCD(4,6)=2, so the Samza job would have two 
> StreamTask instances. Using this style can decrease a Samza job's 
> parallelism, but provides the guarantee that a StreamTask instance will 
> receive all messages across all input streams for a key that it's in charge 
> of. For example, if a StreamTask is consuming AdViews and AdClicks, and both 
> are partitioned by member ID, but AdViews has 12 partitions, and AdClicks has 
> 8 partitions, then there will be 4 StreamTask instances, and each instance 
> will receive rougly 1/4th of all clicks and views, and all clicks and views 
> for a given member ID will be mapped to just one of the StreamTask, so 
> aggregation/joining will be possible.
> The default task.partition.scheme will be max, when the user hasn't specified 
> a partition scheme. Thus, the default will not allow any aggregation or 
> joining across input streams.
> With both of these styles, we can still use the Partition class (and 
> getPartitionId) to identify each StreamTask instance, but we will need to 
> devise a deterministic way to map from each input stream/partition pair to 
> each StreamTask partition.
> In the case of style #1 (max), consider the case where we have IS1 with 4 
> partitions and IS2 with 8 partitions. We can use the order of task.inputs to 
> define an ordering across stream names. We can then instantiate all 12 
> StreamTasks, and simply iterate over all input stream's based on their 
> task.inputs order and sorted partition sets to do the mapping. If we had 
> task.inputs=IS2,IS1, the mapping would look like this:
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:4
> IS2:5 - StreamTask:5
> IS2:6 - StreamTask:6
> IS2:7 - StreamTask:7
> IS1:0 - StreamTask:8
> IS1:1 - StreamTask:9
> IS1:2 - StreamTask:10
> IS1:3 - StreamTask:11
> In the case of style #2 (cogroup), consider the case where IS1 has 8 
> partitions and IS2 has 12 partitions. GCD(8,12)=4, so 4 StreamTasks would be 
> created. The mapping in this case should then be:
> IS1:0 - StreamTask:0
> IS1:1 - StreamTask:1
> IS1:2 - StreamTask:2
> IS1:3 - StreamTask:3
> IS1:4 - StreamTask:0
> IS1:5 - StreamTask:1
> IS1:6 - StreamTask:2
> IS1:7 - StreamTask:3
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:0
> IS2:5 - StreamTask:1
> IS2:6 - StreamTask:2
> IS2:7 - StreamTask:3
> IS2:8 - StreamTask:0
> IS2:9 - StreamTask:1
> IS2:10 - StreamTask:2
> IS2:11 - StreamTask:3
> As you can see, the assignment is done by modding each input stream's 
> partition number by the GCD value (4, in this case). This assignment strategy 
> has the nice guarantee that keys will map to the same StreamTask across input 
> streams with different partition counts (provided that they're partitioned by 
> the same key). For example, member ID 1213 % 8 = partition 5 in IS1, and 1213 
> %12 = partition 1 in IS2. If you then mod by the GCD (4), you get 5%4=1 and 
> 1%4=1. The same holds true for other keys, as well.
> 1211%8=3 .. 3%4=3
> 1211%12=11 .. 11%4=3
> Both of these partition assignment schemes work only as long as the guarantee 
> that the task.inputs stream order is static (or new streams are appended to 
> the end), and that each input stream's partition count is static, and will 
> never change.
> You can use the Euclidean algorithm to find the GCD:
> http://www-math.ucdenver.edu/~wcherowi/courses/m5410/exeucalg.html



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to