[ 
https://issues.apache.org/jira/browse/SAMZA-71?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13864634#comment-13864634
 ] 

Jakob Homan commented on SAMZA-71:
----------------------------------

bq. 2. Can you do the new KafkaCheckpointManager as a separate class (keep the 
old one)? This will make it easier for us to migrate existing jobs. Once 
committed, we can mark the old one as deprecated, and open a new JIRA to remove 
it.
My preference would be to nuke the old class.  Existing jobs can either rename 
themselves or manually delete the checkpoint log (it would be great if Kafka 
had a way to force log deletion, but according to Jun that's still a ways off 
KAFKA-330).  Do you think there are enough Samza jobs in the wild for which 
this would be an insurmountable problem to justify the extra cost and code?  If 
there are enough, it would be easy to write a quick one-off tool to generate a 
new-format log from the old-style one.  Maybe the log should have a version 
number added to its name (in the absence of metadata capabilities about the 
Kafka log).  My concern is that we're iterating on this code pretty quickly; it 
would be easy to start accumulating old-style readers and writers.  Once we've 
got a release out, it'll be important to provide this type of backwards 
compatibility support.  
bq. 4. We'd have to think about the pluggability part of this. The problem with 
the DATA-CENTER example is that, not all partitions for a given data center are 
guaranteed to be assigned to a single container. Essentially, there's two 
levels of assignment going on: partition -> container, and partition -> TI. We 
only control the partition -> TI mapping with this setting, and if the 
partition -> container mapping is incorrect, you're going to get weird (wrong) 
results.
In this example, the partition could be guaranteed.  It would be up to the 
implementor.  All the pluggable function would do would be to take set of TPs 
and group them into inputs for a particular TI.  For example, assuming four 
topics (DC1-Pageview, DC2-Pageview, DC1-Click, DC2-Click), each partitioned two 
ways, the function could either generate two sets {noformat}{DC1-Pageview/0, 
DC1-Click/0, DC1-Pageview/1, DC1-Click/1},  {DC2-Pageview/0, DC2-Click/0, 
DC2-Pageview/1, DC2-Click/1}{noformat} or four {noformat} {DC1-Pageview/0, 
DC1-Click/0}, {DC1-Pageview/1, DC1-Click/1}, {DC2-Pageview/0, DC2-Click/0}, 
{DC2-Pageview/1, DC2-Click/1} {noformat} to preserve the partition grouping 
(ie, {{GROUP BY substring(topic, 3)}} versus {{GROUP BY substring(topic, 3), 
partition}}).  Either way, each of those sets would result in the creation of a 
new TI, to be run somewhere.  The pluggable code would have access to all the 
TPs and could group as appropriate.
bq. 5. Do you want to sub-task this to keep the checkpoint refactor and the 
partition strategy changes?
Yep.

> Support new partitioning strategies
> -----------------------------------
>
>                 Key: SAMZA-71
>                 URL: https://issues.apache.org/jira/browse/SAMZA-71
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>              Labels: project
>
> Currently, the number of stream tasks instances that are created for a Samza 
> job are equal to the max number of partitions across all input streams. For 
> example, if your Samza job is using two YARN containers, and has two input 
> streams (IS1 and IS2), and IS1 has 4 partitions, and IS2 has 8 partitions, 
> then the Samza job will have a total of max(4,8)=8 partitions. Therefore, 8 
> StreamTask instances would be created (spread as evenly as possible across 
> two YARN containers).
> This scheme works for co-grouping when both input streams have the same 
> number of partitions, and are partitioned using the same partitioning scheme 
> (e.g. a Kafka partitioner that partitions by member ID). The parallelism of 
> the job is limited by the number of StreamTask instances, which means that 
> the parallelism is limited by the max number of partitions across all input 
> streams (8 in the example above).
> We can actually do better than these guarantees. We should change Samza's 
> partitioning style to behave in the following way:
> 1. Support a task.partition.scheme=max config. Samza will create one stream 
> task instance for each input stream partition. In the example above, IS1 has 
> 4 partitions, and IS2 has 8 partitions, so Samza would create 4+8=12 
> StreamTasks. Each input stream's partition would be mapped to a unique 
> StreamTask that would receive messages ONLY from that input stream/partition 
> pair, and from no other. Using this style of partitioning increases a Samza 
> job's possible parallelism to be the absolute maximum (based on Kafka 
> semantics, which limit a single consumer for each input stream/partition 
> pair).
> 2. Support a task.partition.scheme=cogroup config. Samza will create one 
> stream task instance for the greatest common denominator of all stream task 
> partition counts. For example, in the example above, IS1 has 4 partitions, 
> and IS2 has 8. GCD(4,8)=4, so Samza would create four partitions. If IS1 had 
> 4 partitions, and IS2 had 6, then GCD(4,6)=2, so the Samza job would have two 
> StreamTask instances. Using this style can decrease a Samza job's 
> parallelism, but provides the guarantee that a StreamTask instance will 
> receive all messages across all input streams for a key that it's in charge 
> of. For example, if a StreamTask is consuming AdViews and AdClicks, and both 
> are partitioned by member ID, but AdViews has 12 partitions, and AdClicks has 
> 8 partitions, then there will be 4 StreamTask instances, and each instance 
> will receive rougly 1/4th of all clicks and views, and all clicks and views 
> for a given member ID will be mapped to just one of the StreamTask, so 
> aggregation/joining will be possible.
> The default task.partition.scheme will be max, when the user hasn't specified 
> a partition scheme. Thus, the default will not allow any aggregation or 
> joining across input streams.
> With both of these styles, we can still use the Partition class (and 
> getPartitionId) to identify each StreamTask instance, but we will need to 
> devise a deterministic way to map from each input stream/partition pair to 
> each StreamTask partition.
> In the case of style #1 (max), consider the case where we have IS1 with 4 
> partitions and IS2 with 8 partitions. We can use the order of task.inputs to 
> define an ordering across stream names. We can then instantiate all 12 
> StreamTasks, and simply iterate over all input stream's based on their 
> task.inputs order and sorted partition sets to do the mapping. If we had 
> task.inputs=IS2,IS1, the mapping would look like this:
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:4
> IS2:5 - StreamTask:5
> IS2:6 - StreamTask:6
> IS2:7 - StreamTask:7
> IS1:0 - StreamTask:8
> IS1:1 - StreamTask:9
> IS1:2 - StreamTask:10
> IS1:3 - StreamTask:11
> In the case of style #2 (cogroup), consider the case where IS1 has 8 
> partitions and IS2 has 12 partitions. GCD(8,12)=4, so 4 StreamTasks would be 
> created. The mapping in this case should then be:
> IS1:0 - StreamTask:0
> IS1:1 - StreamTask:1
> IS1:2 - StreamTask:2
> IS1:3 - StreamTask:3
> IS1:4 - StreamTask:0
> IS1:5 - StreamTask:1
> IS1:6 - StreamTask:2
> IS1:7 - StreamTask:3
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:0
> IS2:5 - StreamTask:1
> IS2:6 - StreamTask:2
> IS2:7 - StreamTask:3
> IS2:8 - StreamTask:0
> IS2:9 - StreamTask:1
> IS2:10 - StreamTask:2
> IS2:11 - StreamTask:3
> As you can see, the assignment is done by modding each input stream's 
> partition number by the GCD value (4, in this case). This assignment strategy 
> has the nice guarantee that keys will map to the same StreamTask across input 
> streams with different partition counts (provided that they're partitioned by 
> the same key). For example, member ID 1213 % 8 = partition 5 in IS1, and 1213 
> %12 = partition 1 in IS2. If you then mod by the GCD (4), you get 5%4=1 and 
> 1%4=1. The same holds true for other keys, as well.
> 1211%8=3 .. 3%4=3
> 1211%12=11 .. 11%4=3
> Both of these partition assignment schemes work only as long as the guarantee 
> that the task.inputs stream order is static (or new streams are appended to 
> the end), and that each input stream's partition count is static, and will 
> never change.
> You can use the Euclidean algorithm to find the GCD:
> http://www-math.ucdenver.edu/~wcherowi/courses/m5410/exeucalg.html



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to