Chris Riccomini created SAMZA-71:
------------------------------------

             Summary: Support new partitioning strategies
                 Key: SAMZA-71
                 URL: https://issues.apache.org/jira/browse/SAMZA-71
             Project: Samza
          Issue Type: Bug
          Components: container
    Affects Versions: 0.6.0
            Reporter: Chris Riccomini


Currently, the number of stream tasks instances that are created for a Samsa 
job are equal to the max number of partitions across all input streams. For 
example, if your Samsa job is using two YARN containers, and has two input 
streams (IS1 and IS2), and IS1 has 4 partitions, and IS2 has 8 partitions, then 
the Samsa job will have a total of max(4,8)=8 partitions. Therefore, 8 
StreamTask instances would be created (spread as evenly as possible across two 
YARN containers).

This scheme works for co-grouping when both input streams have the same number 
of partitions, and are partitioned using the same partitioning scheme (e.g. a 
Kafka partitioner that partitions by member ID). The parallelism of the job is 
limited by the number of StreamTask instances, which means that the parallelism 
is limited by the max number of partitions across all input streams (8 in the 
example above).

We can actually do better than these guarantees. We should change Samsa's 
partitioning style to behave in the following way:

1. Support a task.partition.scheme=max config. Samsa will create one stream 
task instance for each input stream partition. In the example above, IS1 has 4 
partitions, and IS2 has 8 partitions, so Samsa would create 4+8=12 StreamTasks. 
Each input stream's partition would be mapped to a unique StreamTask that would 
receive messages ONLY from that input stream/partition pair, and from no other. 
Using this style of partitioning increases a Samsa job's possible parallelism 
to be the absolute maximum (based on Kafka semantics, which limit a single 
consumer for each input stream/partition pair).

2. Support a task.partition.scheme=cogroup config. Samsa will create one stream 
task instance for the greatest common denominator of all stream task partition 
counts. For example, in the example above, IS1 has 4 partitions, and IS2 has 8. 
GCD(4,8)=4, so Samsa would create four partitions. If IS1 had 4 partitions, and 
IS2 had 6, then GCD(4,6)=2, so the Samsa job would have two StreamTask 
instances. Using this style can decrease a Samsa job's parallelism, but 
provides the guarantee that a StreamTask instance will receive all messages 
across all input streams for a key that it's in charge of. For example, if a 
StreamTask is consuming AdViews and AdClicks, and both are partitioned by 
member ID, but AdViews has 12 partitions, and AdClicks has 8 partitions, then 
there will be 4 StreamTask instances, and each instance will receive rougly 
1/4th of all clicks and views, and all clicks and views for a given member ID 
will be mapped to just one of the StreamTask, so aggregation/joining will be 
possible.

The default task.partition.scheme will be max, when the user hasn't specified a 
partition scheme. Thus, the default will not allow any aggregation or joining 
across input streams.

With both of these styles, we can still use the Partition class (and 
getPartitionId) to identify each StreamTask instance, but we will need to 
devise a deterministic way to map from each input stream/partition pair to each 
StreamTask partition.

In the case of style #1 (max), consider the case where we have IS1 with 4 
partitions and IS2 with 8 partitions. We can use the order of task.inputs to 
define an ordering across stream names. We can then instantiate all 12 
StreamTasks, and simply iterate over all input stream's based on their 
task.inputs order and sorted partition sets to do the mapping. If we had 
task.inputs=IS2,IS1, the mapping would look like this:

IS2:0 - StreamTask:0
IS2:1 - StreamTask:1
IS2:2 - StreamTask:2
IS2:3 - StreamTask:3
IS2:4 - StreamTask:4
IS2:5 - StreamTask:5
IS2:6 - StreamTask:6
IS2:7 - StreamTask:7
IS1:0 - StreamTask:8
IS1:1 - StreamTask:9
IS1:2 - StreamTask:10
IS1:3 - StreamTask:11

In the case of style #2 (cogroup), consider the case where IS1 has 8 partitions 
and IS2 has 12 partitions. GCD(8,12)=4, so 4 StreamTasks would be created. The 
mapping in this case should then be:

IS1:0 - StreamTask:0
IS1:1 - StreamTask:1
IS1:2 - StreamTask:2
IS1:3 - StreamTask:3
IS1:4 - StreamTask:0
IS1:5 - StreamTask:1
IS1:6 - StreamTask:2
IS1:7 - StreamTask:3
IS2:0 - StreamTask:0
IS2:1 - StreamTask:1
IS2:2 - StreamTask:2
IS2:3 - StreamTask:3
IS2:4 - StreamTask:0
IS2:5 - StreamTask:1
IS2:6 - StreamTask:2
IS2:7 - StreamTask:3
IS2:8 - StreamTask:0
IS2:9 - StreamTask:1
IS2:10 - StreamTask:2
IS2:11 - StreamTask:3

As you can see, the assignment is done by modding each input stream's partition 
number by the GCD value (4, in this case). This assignment strategy has the 
nice guarantee that keys will map to the same StreamTask across input streams 
with different partition counts (provided that they're partitioned by the same 
key). For example, member ID 1213 % 8 = partition 5 in IS1, and 1213 %12 = 
partition 1 in IS2. If you then mod by the GCD (4), you get 5%4=1 and 1%4=1. 
The same holds true for other keys, as well.

1211%8=3 .. 3%4=3
1211%12=11 .. 11%4=3

Both of these partition assignment schemes work only as long as the guarantee 
that the task.inputs stream order is static (or new streams are appended to the 
end), and that each input stream's partition count is static, and will never 
change.

You can use the Euclidean algorithm to find the GCD:

http://www-math.ucdenver.edu/~wcherowi/courses/m5410/exeucalg.html



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to