Hey Shekar,

Sure. If your input stream has 8 partitions and is partitioned by "ip
address", then your state stream must also have 8 partitions and be
partitioned by "ip address". This is to guarantee that the StreamTask that
receives a message from the stream will have the state required to do the
table join in its local store.

Cheers,
Chris

On 9/8/14 10:51 AM, "Shekar Tippur" <[email protected]> wrote:

>Chris -
>Can you please elaborate on
>
>"Also note that if you take this approach, your state
>must be partitioned in the same way as your input stream."
>
>- Shekar
>
>On Mon, Sep 8, 2014 at 10:29 AM, Chris Riccomini <
>[email protected]> wrote:
>
>> Hey Shekar,
>>
>> You can either run some external DB that holds the data set, and you can
>> query it from a StreamTask, or you can use Samza's state store feature
>>to
>> push data into a stream that you can then store in a partitioned
>>key-value
>> store along with your StreamTasks. There is some documentation here
>>about
>> the state store approach:
>>
>> 
>>http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
>>te
>> -management.html
>>
>> Putting your data into a Kafka stream is going to require more up front
>> effort from you, since you'll have to understand how Kafka's
>>partitioning
>> model works, and setup some pipeline to push the updates for your state.
>> In the long run, I believe it's the better approach, though. Local
>>lookups
>> on a key-value store should be faster than doing remote RPC calls to a
>>DB
>> for every message. Also note that if you take this approach, your state
>> must be partitioned in the same way as your input stream.
>>
>> I'm sorry I can't give you a more definitive answer. It's really about
>> trade-offs.
>>
>>
>> Cheers,
>> Chris
>>
>> On 9/8/14 10:17 AM, "Shekar Tippur" <[email protected]> wrote:
>>
>> >Hello,
>> >
>> >I am able to read messages of of a new kafka queue now.
>> >The next task is to enrich the data with more information. The data
>>that
>> >is
>> >flowing in has ip address or host name. I have a redis cache where
>>there
>> >is
>> >more contextual information (like owner of the alert, SLA, etc). The
>>data
>> >in redis does not change often.
>> >Pretty much becomes a stream table join.
>> >I can also dump the same data to a different kafka queue and make it a
>> >stream - stream join as well.
>> >
>> >What do you guys recommend?
>> >
>> >- Shekar
>> >
>> >On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini <
>> >[email protected]> wrote:
>> >
>> >> Hey Guys,
>> >>
>> >> I don't know a whole lot about Fluentd, but if you don't want to do
>>this
>> >> flow:
>> >>
>> >>   Fluentd -> Kafka -> Samza
>> >>
>> >> Then the alternative is:
>> >>
>> >>   Fluentd -> Samza
>> >>
>> >> The "direct" approach (no Kafka) is going to be pretty labor
>>intensive
>> >>to
>> >> build. You'd have to:
>> >>
>> >> 1. Implement a FluentdSystemConsumer for Samza.
>> >> 2. Write a Flutend data output plugin, which sends to the
>> >> FluentdSystemConsumer.
>> >> 3. Figure out a way for the Fluentd data output plugin to "discover"
>> >>where
>> >> the Samza FluentdSystemConsumer is located (since SamzaContainers are
>> >> deployed to dynamic hosts in YARN, and move around a lot).
>> >> 4. Implement a bare-bones FluentdSystemAdmin and FluentdSystemFactory
>> >> class (similar to the WikipediaSystemFactory in hello-samza).
>> >> 5. Decide on some partitioning model that makes sense for Fluentd.
>>Maybe
>> >> one partition = one host? Not sure how Fluentd works here.
>> >>
>> >> My instinct is that it's going to be *far* better to use the first
>> >> approach (pipe the Fluentd events into Kafka). This will give you
>>all of
>> >> the semantics that Kafka provides (e.g. Ordering within a partition,
>> >> rewinding streams, durability, etc).
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote:
>> >>
>> >> >also was thinking of having fluentd push to Samza. But don't know
>>how
>> >>to
>> >> >implement this. Not sure if adding a kafka layer between Samza and
>> >>fluentd
>> >> >is the only option.
>> >> >
>> >> >Do other guys have better ideas?
>> >> >
>> >> >Thanks,
>> >> >
>> >> >Fang, Yan
>> >> >[email protected]
>> >> >+1 (206) 849-4108
>> >> >
>> >> >
>> >> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]>
>> >>wrote:
>> >> >
>> >> >> Yan,
>> >> >>
>> >> >> Wont it add additional hop. It did occur to me earlier but was not
>> >>sure
>> >> >>if
>> >> >> this is the right way to go if we have a stringent sla driven
>>system
>> >> >> depending on it.
>> >> >>
>> >> >> - Shekar
>> >> >>
>> >> >>
>> >> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]>
>> >>wrote:
>> >> >>
>> >> >> > If you already put the events to the kafka, you can make the
>>Samza
>> >> >> accepts
>> >> >> > the kafka topic, like the wikipedia-parse project in hello-samza
>> >> >>accepts
>> >> >> > the kafka topic wikipedia-raw ( see the config file ).
>> >> >> >
>> >> >> > Thanks,
>> >> >> >
>> >> >> > Fang, Yan
>> >> >> > [email protected]
>> >> >> > +1 (206) 849-4108
>> >> >> >
>> >> >> >
>> >> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur
>><[email protected]>
>> >> >>wrote:
>> >> >> >
>> >> >> > > Awesome .. This works. Thanks a lot.
>> >> >> > >
>> >> >> > > Now off to my next step.
>> >> >> > > I want to point to an incoming stream of events. These events
>>are
>> >> >> routed
>> >> >> > > via fluentd. So, fluentd acts as a routing layer where it
>>pushes
>> >>the
>> >> >> > events
>> >> >> > > to kafka. Since it is a push and not a pull, any pointers on
>>how
>> >>to
>> >> >> push
>> >> >> > it
>> >> >> > > to samza? Guessing I need a listener on Samza to collect this?
>> >> >> > >
>> >> >> > > - Shekar
>> >> >> > >
>> >> >> > >
>> >> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang
>><[email protected]>
>> >> >>wrote:
>> >> >> > >
>> >> >> > > > Aha, yes, we are almost there. I think I made a mistake in
>>the
>> >> >> previous
>> >> >> > > > email.
>> >> >> > > >
>> >> >> > > > 1. modify the *wikipedia-parser.properties ,  NOT
>> >> >> > > > *wikipedia-feed.properties
>> >> >> > > > 2. run deploy/samza/bin/run-job.sh
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFa
>>>>>>ct
>> >>>>or
>> >> >>y
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.prope
>>>>>>rt
>> >>>>ie
>> >> >>s*
>> >> >> > > > *(NOT *wikipedia-feed,properties*)*
>> >> >> > > >
>> >> >> > > > Then you should see the messages in the kafka topic,
>> >> >> *wikipedia-edits*
>> >> >> > > >
>> >> >> > > > Thanks. Let me know if you have any luck . :)
>> >> >> > > >
>> >> >> > > > Cheers,
>> >> >> > > >
>> >> >> > > > Fang, Yan
>> >> >> > > > [email protected]
>> >> >> > > > +1 (206) 849-4108
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur
>> >><[email protected]
>> >> >
>> >> >> > > wrote:
>> >> >> > > >
>> >> >> > > > > Just tried #3. Changed the property file
>> >> >>wikipedia-feed.properties
>> >> >> > > > >
>> >> >> > > > > 
>>job.factory.class=org.apache.samza.job.local.LocalJobFactory
>> >> >> > > > > Ran ..
>> >> >> > > > >
>> >> >> > > > > deploy/samza/bin/run-job.sh
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFa
>>>>>>ct
>> >>>>or
>> >> >>y
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wi
>>>>>>ki
>> >>>>pe
>> >> >>dia-feed.properties
>> >> >> > > > >
>> >> >> > > > > I dont see any debug messages that I added to the feed or
>>the
>> >> >> parser
>> >> >> > > > file..
>> >> >> > > > > I see messages on the kafka-consumer ..
>> >> >> > > > >
>> >> >> > > > > However the feed job died with the below message
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > > > Exception in thread "ThreadJob"
>>java.lang.RuntimeException:
>> >> >>Trying
>> >> >> to
>> >> >> > > > > unlisten to a channel that has no listeners in it.
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed.
>>>>>>ja
>> >>>>va
>> >> >>:98)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsu
>>>>>>me
>> >>>>r.
>> >> >>java:72)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemC
>>>>>>on
>> >>>>su
>> >> >>mers.scala:152)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemC
>>>>>>on
>> >>>>su
>> >> >>mers.scala:152)
>> >> >> > > > >
>> >> >> > > > > at
>> >>scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > > >
>> >> >> > >
>> >> >>
>> >>
>> 
>>>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:
>>>>>>20
>> >>>>6)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > >
>> >> >>
>> 
>>>>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> 
>>>>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaCont
>>>>>>ai
>> >>>>ne
>> >> >>r.scala:587)
>> >> >> > > > >
>> >> >> > > > > at
>> >> >> > > >
>> >> >>
>> 
>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512)
>> >> >> > > > >
>> >> >> > > > >  at
>> >> >> > >
>> >>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
>> >> >> > > > >
>> >> >> > > > > - Shekar
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> >>
>>
>>

Reply via email to