Hi Yan,

Thanks for your quick reply. Since we don't need to change our rules, we
decided to use the consumer to read the rules in init(), as you suggested.
Works great - thanks so much for your help!

Susan

On Tue, May 12, 2015 at 5:32 PM, Yan Fang <yanfang...@gmail.com> wrote:

> Hi Susan,
>
> -- " I have a custom SystemStreamPartitionGrouper" -- this is a very good
> try!
>
> -- "However, it looks like messages from the Rules stream are going to only
> one
> task instance, and not to all of them like I hoped. I have them running in
> the same container. Is it because there is only one offset value for the
> container for the Rules stream?"
>
> You are right in the guess. Currently there is only one consumer for each
> container. The consumer accepts the SystemStreamPartitio (SSP) from all the
> task Instances as a Set<SSP>. So if there are two Rules Streams, it only
> stores one of them. That is why even you assign the Rules stream to all the
> tasks, if they are running in the same container, there is only one Rules
> stream assigned to arbitrary one of the task instances in this container.
>
> So by using your custom SystemStreamPartitionGrouper, one way of solving
> this is to bring up as many containers as your task number (that is the
> biggest partition number of your input stream by default). This will make
> your job work. The limitation is that, it takes more resources.
>
> Another way is to change the SystemConsumer. This requires a little more
> work. If you do it in your application side, maybe a little easier. From
> the Samza's perspective, it may require to change the SystemConsumer API.
> Opened SAMZA-676  <https://issues.apache.org/jira/browse/SAMZA-676> for
> your use case. You may have a look at the Consumer part of the design doc
> to get a brief idea how to change the SystemConsumer if you want.
>
> The last option is to duplicate the same rule stream to as many partitions
> as your input stream. This does not require any changes in the Group or
> Consumer.
>
> -- "get the rule when starting up StreamTasks and then localize it."
>
> Here I mean, if you do not change the rules, you can read the rules in the
> init() method of the StreamTask. If the rules are in DB, just read the DB;
> if the rules are in a stream, read the stream using a consumer and then
> close the consumer. Because it's one-time task, so a little cost is
> acceptable.
>
> Thank you.
>
> Cheers,
> Fang, Yan
> yanfang...@gmail.com
>
> On Tue, May 12, 2015 at 8:49 AM, Susan Luong <susan...@gmail.com> wrote:
>
> > Hi Yan,
> >
> > We are looking for a solution for a similar problem.
> >
> > Currently, we have 2 input streams, one transactions stream (5
> partitions)
> > and one rules stream (one partition). I have a custom
> > SystemStreamPartitionGrouper that assigns the SSPs like so:.
> >
> > taskName0: Trx stream partition 0, Rules stream partition 0
> > taskName1: Trx stream partition 1, Rules stream partition 0
> > taskName2: Trx stream partition 2, Rules stream partition 0
> > taskName3: Trx stream partition 3, Rules stream partition 0
> > taskName4: Trx stream partition 4, Rules stream partition 0
> >
> > However, it looks like messages from the Rules stream are going to only
> one
> > task instance, and not to all of them like I hoped. I have them running
> in
> > the same container. Is it because there is only one offset value for the
> > container for the Rules stream?
> >
> > Can you please expand on what you mean by "get the rule when starting
> > up StreamTasks
> > and then localize it."? Do you mean, loading messages into a changelog
> > stream using a bootstrap job?
> >
> > Thanks in advance for your help!
> >
> > Susan
> >
> >
> >
> >
> > On Tue, May 5, 2015 at 6:02 PM, Yan Fang <yanfang...@gmail.com> wrote:
> >
> > > If the rule does not change, we can get the rule when starting up
> > > StreamTasks and then localize it.
> > >
> > > Cheers,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Tue, May 5, 2015 at 2:41 PM, Yan Fang <yanfang...@gmail.com> wrote:
> > >
> > > > "If I understand it correctly the only viable solution at the moment
> is
> > > to
> > > > create a new stream for the  rules messages with as many partitions
> as
> > > the
> > > > data stream and write each rules update message to all partitions of
> > the
> > > > new rules stream."
> > > >
> > > > If the data is constantly changing, yes, AFAIK, this is the only
> viable
> > > > solution before we provide "shared store".
> > > >
> > > > Cheers,
> > > >
> > > > Fang, Yan
> > > > yanfang...@gmail.com
> > > >
> > > > On Tue, May 5, 2015 at 12:34 PM, Ueli Gallizzi <
> > ueli.galli...@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Yan,
> > > >>
> > > >> Thanks for your quick response.
> > > >>
> > > >> After I read the discussion on SAMZA-353 I think the best solution
> for
> > > my
> > > >> use case is a "shared state" store among StreamTasks described in
> > > >> SAMZA-402. To give you some background I have a stream with rules
> > which
> > > >> are
> > > >> constantly changing and a data stream on which I apply the rules.
> The
> > > >> rules
> > > >> set is very small if you compare it with the data stream.
> > > >>
> > > >> If I understand it correctly the only viable solution at the moment
> is
> > > to
> > > >> create a new stream for the  rules messages with as many partitions
> as
> > > the
> > > >> data stream and write each rules update message to all partitions of
> > the
> > > >> new rules stream.
> > > >>
> > > >> Cheers,
> > > >> - ueli
> > > >>
> > > >> On Tue, May 5, 2015 at 12:06 PM, Yan Fang <yanfang...@gmail.com>
> > wrote:
> > > >>
> > > >> > Hi Ueli,
> > > >> >
> > > >> > This feature currently is not supported by Samza. There was some
> > > >> > discussions in the JIRA - SAMZA-353
> > > >> > <https://issues.apache.org/jira/browse/SAMZA-353>.
> > > >> >
> > > >> > But there are some workaround for this, depends on what you want
> to
> > > >> > achieve. If you can specify what your requirement is, we can help
> > > think
> > > >> of
> > > >> > the solution.
> > > >> >
> > > >> > In another thread
> > > >> > <
> > > >> >
> > > >>
> > >
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201504.mbox/%3c552410e2.7000...@tivo.com%3E
> > > >> > >,
> > > >> > Tommy Becker has similar requirement and he maybe helpful as well.
> > > >> >
> > > >> > Cheers,
> > > >> > Fang, Yan
> > > >> > yanfang...@gmail.com
> > > >> >
> > > >> > On Tue, May 5, 2015 at 7:42 AM, Ueli Gallizzi <
> > > ueli.galli...@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi,
> > > >> > >
> > > >> > > Is it possible that multiple tasks read from the same input
> stream
> > > >> > > partition?
> > > >> > >
> > > >> > > example:
> > > >> > > task 0 stream A partition 0, stream B partition 0
> > > >> > > task 1 stream A partition 1, stream B partition 0
> > > >> > > task 2 stream A partition 3, stream B partition 0
> > > >> > >
> > > >> > > In this example all messages in stream B partition 0 would be
> > > >> processed
> > > >> > by
> > > >> > > all 3 tasks.
> > > >> > >
> > > >> > > Cheers,
> > > >> > > - ueli
> > > >> > >
> > > >> >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to