[ 
https://issues.apache.org/jira/browse/SAMZA-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14543413#comment-14543413
 ] 

Yan Fang commented on SAMZA-676:
--------------------------------

Thanks for reviewing, Navina.

{quote}
I think usecase #3 and #4 are very similar. I have seen many instances of #4 
coming up (reg. bootstrap stream) which will work well with global state 
implementation.
{quote}

Yes, agreed. Since there are still use case 1 and 2, I am thinking it's still 
worth implementing this feature. Though we have workaround in mailing list, 
prefer to provide this out-of-box. It's a "major" not "critical" feature. :)

{quote}
a. Do all broadcast streams have only 1 partition?
{quote}

No. It's just an example. We can have any number of broadcast SSPs as long as 
users config them.

{quote}
b. How does this affect the consumer’s messagechooser priority? does it provide 
more priority to broadcast stream by default ? In general, my question is how 
will each task proceed at the same rate. We could have hot partitions and those 
tasks may not react to the broadcast stream at the same time as other tasks.
{quote}

In current consumer's implementation, there is no way that we can guarantee 
"each task proceed at the same rate" if more than one tasks are in the same 
container. Because we only have one consumer per system per container, and 
messageChooser is per container. So it's not possible that two tasks receive 
the same messages at the same time.

I do not differentiate the broadcast stream with the normal stream in the 
consumer's level. So if users want to give the broadcast stream higher 
priority, they can set the priority config 
systems.kafka.streams.*broadcastStream*.samza.priority=2.

{quote}
c. Is the broadcast stream also intended to make config changes at a task 
level? Isn’t that a functionality at the JC?
{quote}

I was following the same config fashion as task.input, so not exactly sure 
which level it should go. In my opinion, reading this config actually happens 
in the JobCoordinator class.

{quote}
3. bq. However, this is the feature we will need for the broadcast stream. 
Because all the tasks will have the broadcast stream. When more than two tasks 
are assigned to the same container, the two broadcast streams have different 
offsets, the consumer needs to consumer the same stream more than once, with 
different offsets.
> Can you explain this better?
{quote}
 
Again, assume there is only one system in the job.

Currently, the consumer has two important methods, register(SSP, offset), and 
poll() which returns Map<SSP, List>. 

So if we have two tasks:
task 1 has stream1-partition0, stream2-partition0, *broad-stream-partition0*
task 2 has stream1-partition1, stream2-partition1, *broad-stream-partition0*

If those two tasks are in the same container, the consumer will register all 
those SSPs (there is only one consumer). Since the consumer only returns a Map, 
when it returns a <*broad-stream-partition0*, list> , it can not tell it's for 
task 1 or task 2. What really will happen is that, it only returns a Map with 
five keys - stream1-partition0, stream2-partition0, stream1-partition1, 
stream2-partition1, *broad-stream-partition0*. So broad-stream-partition0 will 
only be processed once, either in task 1 or task 2. Therefore, my suggestion is 
that, when the consumer returns the result, it should also return the taskName 
information, such as task 1 -> Map, task 2 -> Map. This requires us to change 
the Consumer API, as well as the Chooser API. Is it a little more clear?

As I also mentioned in the design doc, this change to Consumer API and Chooser 
API will also help for multiple-partition subscribe. Because when we assign one 
partition to more than one task, if those tasks are in the same container, we 
will come across the same problem.

Of course, another way is to have as many consumers as the task number, but 
this seems not single-thread solution...
{quote}
Why is partition number needed here? Are you suggesting that the tasks can 
consume from one partition of the broadcast stream only? 
{quote}

I think if users only want a few partitions, they should contain the partition 
number. 

{quote}
If I have a broadcast topic with 32 partitions and I want all tasks to consume 
from all of them, then specifying the config will be tedious.
{quote}

This is quite interesting. Should we encourage the broadcast topic to have so 
many partitions ? Because this introduces more complexity in the system - how 
do we prioritize those 32 partitions in one task ? In the config, we can simply 
put something like broadcastTopic#all to allow convenient configuration. 

> Implement Broadcast Stream
> --------------------------
>
>                 Key: SAMZA-676
>                 URL: https://issues.apache.org/jira/browse/SAMZA-676
>             Project: Samza
>          Issue Type: Improvement
>          Components: container
>            Reporter: Yan Fang
>            Assignee: Yan Fang
>         Attachments: BroadcastStreamDesign.md, BroadcastStreamDesign.pdf
>
>
> There are a lot of discussion in SAMZA-353 about assigning the same SSP to 
> multiple taskNames. This ticket is a subset of the discussion. Only focus on 
> the broadcast stream implementation. 
> The goal is to assign one SSP to all the taskNames. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to