[ 
https://issues.apache.org/jira/browse/SAMZA-123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13864997#comment-13864997
 ] 

Jakob Homan commented on SAMZA-123:
-----------------------------------

If we treat the assignment of topic-partitions to task instances as a group by 
operation, it's easy to obtain the three most useful strategies: grouped by 
partition, one task instance per topic-partition (highest granularity) and one 
task intance for all topic-partitions (lowest granularity):

As a summary of the code:
{code}import scala.collection.immutable._
case class TopicPartition(topic:String, partition:Int)

trait TopicPartitionGrouper {
  def group(tps:Set[TopicPartition]):Set[Set[TopicPartition]]
}

val allTps = Set(TopicPartition("DC1-Pageview", 0), 
                 TopicPartition("DC1-Click", 0), 
                 TopicPartition("DC2-Pageview", 1),
                 TopicPartition("DC2-Click", 1), 
                 TopicPartition("DC1-Pageview", 2),
                 TopicPartition("DC3-Combined", 0))

/** Our current approach **/
class GroupByPartition() extends TopicPartitionGrouper {
  def group(tps:Set[TopicPartition]) = tps.groupBy(_.partition).map(_._2).toSet
}

class GroupByTopicPrefix extends TopicPartitionGrouper {
  def group(tps:Set[TopicPartition]) = 
tps.groupBy(_.topic.subSequence(0,3)).map(_._2).toSet
}

class GroupByTopicPrefixAndPartition extends TopicPartitionGrouper {
  def group(tps:Set[TopicPartition]) = tps.groupBy(tp => 
(tp.topic.subSequence(0,3), tp.partition)).map(_._2).toSet
}

class OnePerTI extends TopicPartitionGrouper {
  def group(tps:Set[TopicPartition]) = tps.map(Set(_))
}

class OneBigTI extends TopicPartitionGrouper {
  def group(tps:Set[TopicPartition]) = Set(tps)
}

val strategies = Map("partition" -> new GroupByPartition,
                     "topicPrefix" -> new GroupByTopicPrefix,
                     "topicPrefixAndPartition" -> new 
GroupByTopicPrefixAndPartition,
                     "onePerTI" -> new OnePerTI,
                     "oneBigTI" -> new OneBigTI)

for(s <- strategies) {
  println("For strategy " + s._1 + " we would get the following task 
instances:")
  s._2.group(allTps).foreach(g => println("  TI: " + g))
}{code}
gives us each of the combinations we would want:
{noformat}(M=d929d1) jhoman@JHOMAN-MN s005 ~/scalas/scala-2.8.2.final> 
bin/scala ~/explain.scala 
For strategy partition we would get the following task instances:
  TI: Set(TopicPartition(DC1-Pageview,2))
  TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1))
  TI: Set(TopicPartition(DC3-Combined,0), TopicPartition(DC1-Pageview,0), 
TopicPartition(DC1-Click,0))
For strategy oneBigTI we would get the following task instances:
  TI: Set(TopicPartition(DC1-Pageview,2), TopicPartition(DC2-Click,1), 
TopicPartition(DC2-Pageview,1), TopicPartition(DC3-Combined,0), 
TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0))
For strategy topicPrefixAndPartition we would get the following task instances:
  TI: Set(TopicPartition(DC3-Combined,0))
  TI: Set(TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0))
  TI: Set(TopicPartition(DC1-Pageview,2))
  TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1))
For strategy topicPrefix we would get the following task instances:
  TI: Set(TopicPartition(DC3-Combined,0))
  TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1))
  TI: Set(TopicPartition(DC1-Pageview,2), TopicPartition(DC1-Pageview,0), 
TopicPartition(DC1-Click,0))
For strategy onePerTI we would get the following task instances:
  TI: Set(TopicPartition(DC1-Pageview,2))
  TI: Set(TopicPartition(DC2-Click,1))
  TI: Set(TopicPartition(DC2-Pageview,1))
  TI: Set(TopicPartition(DC3-Combined,0))
  TI: Set(TopicPartition(DC1-Pageview,0))
  TI: Set(TopicPartition(DC1-Click,0)){noformat}

This JIRA will change the topic partitions that are passed to the containers to 
be grouped per the grouping function.  The containers in turn directly 
instantiate task instances with the provided groups.  The containers themselves 
do not have to be aware of the grouping function that was used.  Additionally, 
which the changes from SAMZA-122, it's possible to change the strategy for a 
job after it's been run once, in order to improve performance.



> Move topic partition grouping to the AM and generalize
> ------------------------------------------------------
>
>                 Key: SAMZA-123
>                 URL: https://issues.apache.org/jira/browse/SAMZA-123
>             Project: Samza
>          Issue Type: Sub-task
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Jakob Homan
>            Assignee: Jakob Homan
>             Fix For: 0.7.0
>
>
> Currently the AM sends a set of all the topics and partitions to the 
> container, which then groups them by partition and assigns each set to a task 
> instance. By moving the grouping to the AM, we can assign arbitrary groups to 
> task instances, which will allow more partitioning strategies, as discussed 
> in SAMZA-71.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to