[ https://issues.apache.org/jira/browse/SAMZA-1627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417798#comment-16417798 ]
ASF GitHub Bot commented on SAMZA-1627: --------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/456 > Watermark broadcast enhancements > -------------------------------- > > Key: SAMZA-1627 > URL: https://issues.apache.org/jira/browse/SAMZA-1627 > Project: Samza > Issue Type: Bug > Reporter: Xinyu Liu > Assignee: Xinyu Liu > Priority: Major > > Currently each upstream task needs to broadcast to every single partition of > intermediate streams in order to aggregate watermarks in the consumers. It's > O(n^2). For 256 tasks, 256-partition intermediate stream this can easily > result in 64k msg/s if we send watermark every second. To illustrate: > T1 T2 T3 > | \ /\ | /\ /\ | > P1 P2 P3 > > A better way to do this is to have only one downstream consumer doing the > aggregation, and then broadcast to all the partitions. This is safe as we can > do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all > the messages before t have been published to (P1, P2, P3) already (might not > be consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To > illustrate: > T1 T2 T3 > \ | / > P1 > / \ > P2 P3 > This reduced the full message count from O(n^2) to O(n). The cost is that > this might introduce a few milliseconds delay since we need to exchange the > message twice. The benefit clearly wins. In practice, the aggregate consumer > can be decided from the (topic.hash() % total partitions) to spread the > aggregation if we have multiple intermediate streams. -- This message was sent by Atlassian JIRA (v7.6.3#76005)