[
https://issues.apache.org/jira/browse/SAMZA-1627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xinyu Liu updated SAMZA-1627:
-----------------------------
Description:
Currently each upstream task needs to broadcast to every single partition of
intermediate streams in order to aggregate watermarks in the consumers. It's
O(n^2). For 256 tasks, 256-partition intermediate stream this can easily result
in 64k msg/s if we send watermark every second. To illustrate:
T1 T2 T3
| \ /\ | /\ /\ |
P1 P2 P3
A better way to do this is to have only one downstream consumer doing the
aggregation, and then broadcast to all the partitions. This is safe as we can
do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all
the messages before t have been published to (P1, P2, P3) already (might not be
consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To
illustrate:
T1 T2 T3
\ | /
P1
/ \
P2 P3
This reduced the full message count from O(n^2) to O(n). The cost is that this
might introduce a few milliseconds delay since we need to exchange the message
twice. The benefit clearly wins. In practice, the aggregate consumer can be
decided from the (topic.hash() % total partitions) to spread the aggregation if
we have multiple intermediate streams.
was:
Currently each upstream task needs to broadcast to every single partition of
intermediate streams in order to aggregate watermarks in the consumers. It's
O(n^2). For 256 tasks, 256-partition intermediate stream this can easily result
in 64k msg/s if we send watermark every second. To illustrate:
T1 T2 T3
| \ /\ | /\ /\ |
P1 P2 P3
A better way to do this is to have only one downstream consumer doing the
aggregation, and then broadcast to all the partitions. This is safe as we can
do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all
the messages before t have been published to (P1, P2, P3) already (might not be
consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To
illustrate:
T1 T2 T3
\ | /
P1
/ \
P2 P3
This reduced the full message count from O(n^2) to O(n). In practice, the
aggregate consumer can be decided from the (topic.hash() % total partitions) to
spread the aggregation if we have multiple intermediate streams.
> Watermark broadcast enhancements
> --------------------------------
>
> Key: SAMZA-1627
> URL: https://issues.apache.org/jira/browse/SAMZA-1627
> Project: Samza
> Issue Type: Bug
> Reporter: Xinyu Liu
> Assignee: Xinyu Liu
> Priority: Major
>
> Currently each upstream task needs to broadcast to every single partition of
> intermediate streams in order to aggregate watermarks in the consumers. It's
> O(n^2). For 256 tasks, 256-partition intermediate stream this can easily
> result in 64k msg/s if we send watermark every second. To illustrate:
> T1 T2 T3
> | \ /\ | /\ /\ |
> P1 P2 P3
>
> A better way to do this is to have only one downstream consumer doing the
> aggregation, and then broadcast to all the partitions. This is safe as we can
> do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all
> the messages before t have been published to (P1, P2, P3) already (might not
> be consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To
> illustrate:
> T1 T2 T3
> \ | /
> P1
> / \
> P2 P3
> This reduced the full message count from O(n^2) to O(n). The cost is that
> this might introduce a few milliseconds delay since we need to exchange the
> message twice. The benefit clearly wins. In practice, the aggregate consumer
> can be decided from the (topic.hash() % total partitions) to spread the
> aggregation if we have multiple intermediate streams.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)