[ 
https://issues.apache.org/jira/browse/SAMZA-1627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417798#comment-16417798
 ] 

ASF GitHub Bot commented on SAMZA-1627:
---------------------------------------

Github user asfgit closed the pull request at:

    https://github.com/apache/samza/pull/456


> Watermark broadcast enhancements
> --------------------------------
>
>                 Key: SAMZA-1627
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1627
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Xinyu Liu
>            Assignee: Xinyu Liu
>            Priority: Major
>
> Currently each upstream task needs to broadcast to every single partition of 
> intermediate streams in order to aggregate watermarks in the consumers. It's 
> O(n^2). For 256 tasks, 256-partition intermediate stream this can easily 
> result in 64k msg/s if we send watermark every second. To illustrate:
> T1     T2    T3
>  |   \   /\ |  /\ /\ |
> P1      P2    P3
>  
> A better way to do this is to have only one downstream consumer doing the 
> aggregation, and then broadcast to all the partitions. This is safe as we can 
> do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all 
> the messages before t have been published to (P1, P2, P3) already (might not 
> be consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To 
> illustrate:
> T1     T2     T3
>     \      |       /
>          P1
>        /    \
>       P2  P3
> This reduced the full message count from O(n^2) to O(n). The cost is that 
> this might introduce a few milliseconds delay since we need to exchange the 
> message twice. The benefit clearly wins. In practice, the aggregate consumer 
> can be decided from the (topic.hash() % total partitions) to spread the 
> aggregation if we have multiple intermediate streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to