Hi Yan, Thanks for your quick reply. Since we don't need to change our rules, we decided to use the consumer to read the rules in init(), as you suggested. Works great - thanks so much for your help!
Susan On Tue, May 12, 2015 at 5:32 PM, Yan Fang <yanfang...@gmail.com> wrote: > Hi Susan, > > -- " I have a custom SystemStreamPartitionGrouper" -- this is a very good > try! > > -- "However, it looks like messages from the Rules stream are going to only > one > task instance, and not to all of them like I hoped. I have them running in > the same container. Is it because there is only one offset value for the > container for the Rules stream?" > > You are right in the guess. Currently there is only one consumer for each > container. The consumer accepts the SystemStreamPartitio (SSP) from all the > task Instances as a Set<SSP>. So if there are two Rules Streams, it only > stores one of them. That is why even you assign the Rules stream to all the > tasks, if they are running in the same container, there is only one Rules > stream assigned to arbitrary one of the task instances in this container. > > So by using your custom SystemStreamPartitionGrouper, one way of solving > this is to bring up as many containers as your task number (that is the > biggest partition number of your input stream by default). This will make > your job work. The limitation is that, it takes more resources. > > Another way is to change the SystemConsumer. This requires a little more > work. If you do it in your application side, maybe a little easier. From > the Samza's perspective, it may require to change the SystemConsumer API. > Opened SAMZA-676 <https://issues.apache.org/jira/browse/SAMZA-676> for > your use case. You may have a look at the Consumer part of the design doc > to get a brief idea how to change the SystemConsumer if you want. > > The last option is to duplicate the same rule stream to as many partitions > as your input stream. This does not require any changes in the Group or > Consumer. > > -- "get the rule when starting up StreamTasks and then localize it." > > Here I mean, if you do not change the rules, you can read the rules in the > init() method of the StreamTask. If the rules are in DB, just read the DB; > if the rules are in a stream, read the stream using a consumer and then > close the consumer. Because it's one-time task, so a little cost is > acceptable. > > Thank you. > > Cheers, > Fang, Yan > yanfang...@gmail.com > > On Tue, May 12, 2015 at 8:49 AM, Susan Luong <susan...@gmail.com> wrote: > > > Hi Yan, > > > > We are looking for a solution for a similar problem. > > > > Currently, we have 2 input streams, one transactions stream (5 > partitions) > > and one rules stream (one partition). I have a custom > > SystemStreamPartitionGrouper that assigns the SSPs like so:. > > > > taskName0: Trx stream partition 0, Rules stream partition 0 > > taskName1: Trx stream partition 1, Rules stream partition 0 > > taskName2: Trx stream partition 2, Rules stream partition 0 > > taskName3: Trx stream partition 3, Rules stream partition 0 > > taskName4: Trx stream partition 4, Rules stream partition 0 > > > > However, it looks like messages from the Rules stream are going to only > one > > task instance, and not to all of them like I hoped. I have them running > in > > the same container. Is it because there is only one offset value for the > > container for the Rules stream? > > > > Can you please expand on what you mean by "get the rule when starting > > up StreamTasks > > and then localize it."? Do you mean, loading messages into a changelog > > stream using a bootstrap job? > > > > Thanks in advance for your help! > > > > Susan > > > > > > > > > > On Tue, May 5, 2015 at 6:02 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > > > If the rule does not change, we can get the rule when starting up > > > StreamTasks and then localize it. > > > > > > Cheers, > > > > > > Fang, Yan > > > yanfang...@gmail.com > > > > > > On Tue, May 5, 2015 at 2:41 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > > > > > "If I understand it correctly the only viable solution at the moment > is > > > to > > > > create a new stream for the rules messages with as many partitions > as > > > the > > > > data stream and write each rules update message to all partitions of > > the > > > > new rules stream." > > > > > > > > If the data is constantly changing, yes, AFAIK, this is the only > viable > > > > solution before we provide "shared store". > > > > > > > > Cheers, > > > > > > > > Fang, Yan > > > > yanfang...@gmail.com > > > > > > > > On Tue, May 5, 2015 at 12:34 PM, Ueli Gallizzi < > > ueli.galli...@gmail.com> > > > > wrote: > > > > > > > >> Hi Yan, > > > >> > > > >> Thanks for your quick response. > > > >> > > > >> After I read the discussion on SAMZA-353 I think the best solution > for > > > my > > > >> use case is a "shared state" store among StreamTasks described in > > > >> SAMZA-402. To give you some background I have a stream with rules > > which > > > >> are > > > >> constantly changing and a data stream on which I apply the rules. > The > > > >> rules > > > >> set is very small if you compare it with the data stream. > > > >> > > > >> If I understand it correctly the only viable solution at the moment > is > > > to > > > >> create a new stream for the rules messages with as many partitions > as > > > the > > > >> data stream and write each rules update message to all partitions of > > the > > > >> new rules stream. > > > >> > > > >> Cheers, > > > >> - ueli > > > >> > > > >> On Tue, May 5, 2015 at 12:06 PM, Yan Fang <yanfang...@gmail.com> > > wrote: > > > >> > > > >> > Hi Ueli, > > > >> > > > > >> > This feature currently is not supported by Samza. There was some > > > >> > discussions in the JIRA - SAMZA-353 > > > >> > <https://issues.apache.org/jira/browse/SAMZA-353>. > > > >> > > > > >> > But there are some workaround for this, depends on what you want > to > > > >> > achieve. If you can specify what your requirement is, we can help > > > think > > > >> of > > > >> > the solution. > > > >> > > > > >> > In another thread > > > >> > < > > > >> > > > > >> > > > > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201504.mbox/%3c552410e2.7000...@tivo.com%3E > > > >> > >, > > > >> > Tommy Becker has similar requirement and he maybe helpful as well. > > > >> > > > > >> > Cheers, > > > >> > Fang, Yan > > > >> > yanfang...@gmail.com > > > >> > > > > >> > On Tue, May 5, 2015 at 7:42 AM, Ueli Gallizzi < > > > ueli.galli...@gmail.com> > > > >> > wrote: > > > >> > > > > >> > > Hi, > > > >> > > > > > >> > > Is it possible that multiple tasks read from the same input > stream > > > >> > > partition? > > > >> > > > > > >> > > example: > > > >> > > task 0 stream A partition 0, stream B partition 0 > > > >> > > task 1 stream A partition 1, stream B partition 0 > > > >> > > task 2 stream A partition 3, stream B partition 0 > > > >> > > > > > >> > > In this example all messages in stream B partition 0 would be > > > >> processed > > > >> > by > > > >> > > all 3 tasks. > > > >> > > > > > >> > > Cheers, > > > >> > > - ueli > > > >> > > > > > >> > > > > >> > > > > >> > > > > > > > > > > > > > >