also was thinking of having fluentd push to Samza. But don't know how to implement this. Not sure if adding a kafka layer between Samza and fluentd is the only option.
Do other guys have better ideas? Thanks, Fang, Yan [email protected] +1 (206) 849-4108 On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]> wrote: > Yan, > > Wont it add additional hop. It did occur to me earlier but was not sure if > this is the right way to go if we have a stringent sla driven system > depending on it. > > - Shekar > > > On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]> wrote: > > > If you already put the events to the kafka, you can make the Samza > accepts > > the kafka topic, like the wikipedia-parse project in hello-samza accepts > > the kafka topic wikipedia-raw ( see the config file ). > > > > Thanks, > > > > Fang, Yan > > [email protected] > > +1 (206) 849-4108 > > > > > > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur <[email protected]> wrote: > > > > > Awesome .. This works. Thanks a lot. > > > > > > Now off to my next step. > > > I want to point to an incoming stream of events. These events are > routed > > > via fluentd. So, fluentd acts as a routing layer where it pushes the > > events > > > to kafka. Since it is a push and not a pull, any pointers on how to > push > > it > > > to samza? Guessing I need a listener on Samza to collect this? > > > > > > - Shekar > > > > > > > > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang <[email protected]> wrote: > > > > > > > Aha, yes, we are almost there. I think I made a mistake in the > previous > > > > email. > > > > > > > > 1. modify the *wikipedia-parser.properties , NOT > > > > *wikipedia-feed.properties > > > > 2. run deploy/samza/bin/run-job.sh > > > > > > > > > > --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory > > > > > > > > > > --config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.properties* > > > > *(NOT *wikipedia-feed,properties*)* > > > > > > > > Then you should see the messages in the kafka topic, > *wikipedia-edits* > > > > > > > > Thanks. Let me know if you have any luck . :) > > > > > > > > Cheers, > > > > > > > > Fang, Yan > > > > [email protected] > > > > +1 (206) 849-4108 > > > > > > > > > > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur <[email protected]> > > > wrote: > > > > > > > > > Just tried #3. Changed the property file wikipedia-feed.properties > > > > > > > > > > job.factory.class=org.apache.samza.job.local.LocalJobFactory > > > > > Ran .. > > > > > > > > > > deploy/samza/bin/run-job.sh > > > > > > > > > > > > > > > --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory > > > > > > > > > > > > > > > > > > > > --config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wikipedia-feed.properties > > > > > > > > > > I dont see any debug messages that I added to the feed or the > parser > > > > file.. > > > > > I see messages on the kafka-consumer .. > > > > > > > > > > However the feed job died with the below message > > > > > > > > > > > > > > > Exception in thread "ThreadJob" java.lang.RuntimeException: Trying > to > > > > > unlisten to a channel that has no listeners in it. > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed.java:98) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsumer.java:72) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsumers.scala:152) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsumers.scala:152) > > > > > > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > > > > > > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > > > > > > > > at > > > > > > > > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > > > > > > > > > > at > > > > > org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaContainer.scala:587) > > > > > > > > > > at > > > > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512) > > > > > > > > > > at > > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > > > > > > > > > > - Shekar > > > > > > > > > > > > > > >
