Hi Chris, Since Sonaly brings this up, there are related questions which have been confusing me for a while: 1) will systems.kafka.samza.msg.serde=string apply for both input kafka stream and output kafka stream? (I assume yes...) 2) in stream-level overwrite (systems.kafka.streams.<your output stream>.samza.msg.serde=json), is the output-stream the output topic name? 3) Is it possible to specify different serdes for different Kafka topics?
Thanks, Fang, Yan [email protected] +1 (206) 849-4108 On Wed, Mar 19, 2014 at 4:03 PM, <[email protected]> wrote: > Ah, makes sense. So I might have to define separate serdes for the input > and output topic. Thanks! > > > -----Original Message----- > From: Chris Riccomini [mailto:[email protected]] > Sent: Wednesday, March 19, 2014 3:57 PM > To: [email protected] > Subject: Re: Writing my Custom Job > > Hey Sonali, > > It's the name of the stream you're sending to in OutgoingMessageEnvelope. > > collector.send(new OutgoingMessageEnvelope("kafka", "your-output-topic", > key, msg)) > > Cheers, > Chris > > On 3/19/14 3:54 PM, "[email protected]" > <[email protected]> wrote: > > >Ah I see. This might be a silly question, but where can I find the > >output_stream name? > > > >-----Original Message----- > >From: Chris Riccomini [mailto:[email protected]] > >Sent: Wednesday, March 19, 2014 3:29 PM > >To: [email protected] > >Subject: Re: Writing my Custom Job > > > >Hey Sonali, > > > >In such a case, you probably just want to do stream-level overrides. > >First, add the JSON serde to your config: > > > > > >serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeF > >act > >o > >ry > > > > > >And then forcibly specifying the serde for your output topic(s): > > > > systems.kafka.streams.<your output stream>.samza.msg.serde=json > > > >Cheers, > >Chris > > > >On 3/19/14 3:21 PM, "[email protected]" > ><[email protected]> wrote: > > > >>One quick question, > >> > >>systems.kafka.samza.msg.serde=string, is this property for the output > >>aka the new kafka topic I'm trying to produce on? > >> > >>What happens, if the input message is a String and the output is a JSON? > >> > >>Thanks, > >>Sonali > >> > >>-----Original Message----- > >>From: Yan Fang [mailto:[email protected]] > >>Sent: Wednesday, March 19, 2014 2:31 PM > >>To: [email protected] > >>Subject: Re: Writing my Custom Job > >> > >>make sure systems.kafka.samza.msg.serde=string > >> > >> > >>Fang, Yan > >>[email protected] > >>+1 (206) 849-4108 > >> > >> > >>On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]> > >>wrote: > >> > >>> Hmm, so I read the input which is a String and want to write to the > >>> output topic also as a string. So I used the StringSerdeFactory. But > >>> my samza complains and wants the JsonSerdefactory. > >>> Exception in thread "main" org.apache.samza.SamzaException: Serde > >>> json for system kafka does not exist in configuration. > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$ > >>>1 > >>>7$$ > >>>anonfun$16.apply(SamzaContainer.scala:185) > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$ > >>>1 > >>>7$$ > >>>anonfun$16.apply(SamzaContainer.scala:185) > >>> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > >>> at scala.collection.AbstractMap.getOrElse(Map.scala:58) > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$ > >>>1 > >>>7.a > >>>pply(SamzaContainer.scala:185) > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$ > >>>1 > >>>7.a > >>>pply(SamzaContainer.scala:183) > >>> at > >>> > >>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. > >>>sca > >>>la:244) > >>> at > >>> > >>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. > >>>sca > >>>la:244) > >>> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > >>> at > >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > >>> at > >>> > >>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set. > >>>s > >>>cal > >>>a:47) > >>> at scala.collection.SetLike$class.map(SetLike.scala:93) > >>> at scala.collection.AbstractSet.map(Set.scala:47) > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaCont > >>>a > >>>ine > >>>r.scala:183) > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaCont > >>>a > >>>ine > >>>r.scala:180) > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala: > >>>207 > >>>) > >>> at > >>> > >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82) > >>> at > >>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >>> > >>> Why is that? > >>> > >>> -----Original Message----- > >>> From: Chris Riccomini [mailto:[email protected]] > >>> Sent: Wednesday, March 19, 2014 12:40 PM > >>> To: [email protected] > >>> Subject: Re: Writing my Custom Job > >>> > >>> Hey Guys, > >>> > >>> This is a transitive dependency from Kafka. My guess is that the > >>>Kafka pom files published to maven are not quite working properly. > >>>We've already had some issues with them. Yan's suggestion is an > >>>appropriate fix. The only thing I'd add is that you can make them a > >>>runtime dependency. > >>> > >>> <scope>runtime</scope> > >>> > >>> > >>> Cheers, > >>> Chris > >>> > >>> On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote: > >>> > >>> >Hi Sonali, > >>> > > >>> >I am not sure how Chris will response. My quick fix for this > >>> >problem was simple adding the > >>> >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metri > >>> >c > >>> >s > >>> >-c > >>> >ore > >>> ></artifactId><version>2.2.0</version></dependency>* > >>> >to the pom.xml file when compile... > >>> > > >>> >Hope that will help. > >>> > > >>> >Thanks, > >>> > > >>> >Fang, Yan > >>> >[email protected] > >>> >+1 (206) 849-4108 > >>> > > >>> > > >>> >On Wed, Mar 19, 2014 at 11:23 AM, > >>> ><[email protected]>wrote: > >>> > > >>> >> So I downloaded and built the latest samza code. I get this error: > >>> >> Exception in thread "main" java.lang.NoClassDefFoundError: > >>> >> com/yammer/metrics/Metrics > >>> >> at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown > >>> >>Source) > >>> >> at kafka.producer.ProducerRequestMetrics.newTimer(Unknown > >>> >>Source) > >>> >> at kafka.producer.ProducerRequestMetrics.<init>(Unknown > >>>Source) > >>> >> at kafka.producer.ProducerRequestStats.<init>(Unknown > >>>Source) > >>> >> at > >>> >> > >>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unkno > >>> >>w > >>> >>n > >>> >>Source) > >>> >> at > >>> >> > >>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unkno > >>> >>w > >>> >>n > >>> >>Source) > >>> >> at kafka.utils.Pool.getAndMaybePut(Unknown Source) > >>> >> at > >>> >> > >>> >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestSta > >>> >>t > >>> >>s > >>> >>(U > >>> >>nkn > >>> >>own > >>> >> Source) > >>> >> at kafka.producer.SyncProducer.<init>(Unknown Source) > >>> >> at > >>> >>kafka.producer.ProducerPool$.createSyncProducer(Unknown > >>> >>Source) > >>> >> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown > >>>Source) > >>> >> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown > >>>Source) > >>> >> at > >>> >> > >>> >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Cl > >>> >>i > >>> >>e > >>> >>nt > >>> >>Uti > >>> >>lTopicMetadataStore.scala:40) > >>> >> at > >>> >> > >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(Ka > >>> >>f > >>> >>k > >>> >>aS > >>> >>yst > >>> >>emAdmin.scala:208) > >>> >> at > >>> >> > >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Ka > >>> >>f > >>> >>k > >>> >>aS > >>> >>yst > >>> >>emAdmin.scala:149) > >>> >> at > >>> >> > >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Ka > >>> >>f > >>> >>k > >>> >>aS > >>> >>yst > >>> >>emAdmin.scala:149) > >>> >> at > >>> >> > >>> >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata > >>> >>( > >>> >>T > >>> >>op > >>> >>icM > >>> >>etadataCache.scala:54) > >>> >> at > >>> >> > >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMeta > >>> >>d > >>> >>a > >>> >>ta > >>> >>(Ka > >>> >>fkaSystemAdmin.scala:146) > >>> >> at > >>> >> > >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMeta > >>> >>d > >>> >>a > >>> >>ta > >>> >>(Ka > >>> >>fkaSystemAdmin.scala:125) > >>> >> at > >>> >> > >>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.app > >>> >>l > >>> >>y > >>> >>(U > >>> >>til > >>> >>.scala:98) > >>> >> at > >>> >> > >>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.app > >>> >>l > >>> >>y > >>> >>(U > >>> >>til > >>> >>.scala:86) > >>> >> at > >>> >> > >>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversa > >>> >>b > >>> >>l > >>> >>eL > >>> >>ike > >>> >>.scala:251) > >>> >> at > >>> >> > >>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversa > >>> >>b > >>> >>l > >>> >>eL > >>> >>ike > >>> >>.scala:251) > >>> >> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > >>> >> at > >>> >> > >>> > >>>>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scal > >>>>>a > >>>>>:25 > >>>>>1) > >>> >> at > >>> >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > >>> >> at > >>> >> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86) > >>> >> at > >>> >> > >>> >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAp > >>> >>p > >>> >>M > >>> >>as > >>> >>ter > >>> >>TaskManager.scala:79) > >>> >> at > >>> >> > >>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:7 > >>>8 > >>>) > >>> >> at > >>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scal > >>> >> a > >>> >> ) Caused by: java.lang.ClassNotFoundException: > >>>com.yammer.metrics.Metrics > >>> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > >>> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > >>> >> at java.security.AccessController.doPrivileged(Native > >>>Method) > >>> >> at > >>>java.net.URLClassLoader.findClass(URLClassLoader.java:354) > >>> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > >>> >> at > >>>sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > >>> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > >>> >> ... 30 more > >>> >> > >>> >> -----Original Message----- > >>> >> From: Chris Riccomini [mailto:[email protected]] > >>> >> Sent: Tuesday, March 18, 2014 8:06 PM > >>> >> To: [email protected] > >>> >> Subject: Re: Writing my Custom Job > >>> >> > >>> >> Hey Sonali, > >>> >> > >>> >> Line 65 currently looks like: > >>> >> > >>> >> val amClient = new AMRMClientImpl[ContainerRequest] > >>> >> > >>> >> There is no call to Option().get, which is what would trigger > >>> >> this exception. Are you running the latest Samza code from the > >>> >> master > >>>branch? > >>> >> > >>> >> Cheers, > >>> >> Chris > >>> >> > >>> >> On 3/18/14 5:11 PM, "[email protected]" > >>> >> <[email protected]> wrote: > >>> >> > >>> >> >Has this happened to anyone before? > >>> >> >Exception in thread "main" java.util.NoSuchElementException: > >>>None.get > >>> >> > at scala.None$.get(Option.scala:313) > >>> >> > at scala.None$.get(Option.scala:311) > >>> >> > at > >>> >> > >>>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala: > >>>>6 > >>>>5) > >>> >> > at > >>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scal > >>> >> a > >>> >> ) > >>> >> > > >>> >> >-----Original Message----- > >>> >> >From: Garry Turkington [mailto:[email protected]] > >>> >> >Sent: Tuesday, March 18, 2014 4:16 PM > >>> >> >To: [email protected] > >>> >> >Subject: RE: Writing my Custom Job > >>> >> > > >>> >> >Hi, > >>> >> > > >>> >> >1. Specifically, since I'm using kafka I don't have to write > >>> >> >Consumer and Systemfactory Classes? Correct? > >>> >> >Correct > >>> >> > > >>> >> >2. For the SamzaStreamTask would the input be a String? i.e. > >>> >> > String event = (String)envelope.getMessage(); > >>> >> > > >>> >> >Correct -- just make sure you have the serdes on the > >>> >> >system/stream configured as string. > >>> >> > > >>> >> >Garry > >>> >> > > >>> >> >-----Original Message----- > >>> >> >From: [email protected] > >>> >> >[mailto:[email protected]] > >>> >> >Sent: 18 March 2014 22:57 > >>> >> >To: [email protected] > >>> >> >Subject: RE: Writing my Custom Job > >>> >> > > >>> >> >Hey Chris, > >>> >> > > >>> >> >Thanks for the quick response! > >>> >> > > >>> >> >So the thing is, we have the kafka producer independent of Samza. > >>> >> >The idea is to test the kafka streams with different CEPs. So > >>> >> >one example would be Storm. That's why I have a separate kafka > >>> >> >job running that reads from a file and writes to a kafka topic. > >>> >> > > >>> >> >So assuming there is a topic say "input-topic" (where the > >>> >> >message is some event of type "String" and key is the actual > >>> >> >eventId of the > >>> >> >event) already in place, I want to write a SamzaStreamTask that > >>> >> >will read this string, parse it and write to another kafka topic. > >>> >> >In other words, Job1 is already done independent of Samza. I'm > >>> >> >working on > >>> Job2 using Samza. > >>> >> > > >>> >> >1. Specifically, since I'm using kafka I don't have to write > >>> >> >Consumer and Systemfactory Classes? Correct? > >>> >> >2. For the SamzaStreamTask would the input be a String? i.e. > >>> >> > String event = (String)envelope.getMessage(); > >>> >> > > >>> >> >Thanks! > >>> >> >Sonali > >>> >> > > >>> >> >-----Original Message----- > >>> >> >From: Chris Riccomini [mailto:[email protected]] > >>> >> >Sent: Tuesday, March 18, 2014 3:16 PM > >>> >> >To: [email protected] > >>> >> >Subject: Re: Writing my Custom Job > >>> >> > > >>> >> >Hey Sonali, > >>> >> > > >>> >> >1. For CSV file reading, you should check this JIRA out: > >>> >> > > >>> >> > https://issues.apache.org/jira/browse/SAMZA-138 > >>> >> > > >>> >> >2. You don't need to write to a Kafka topic using the standard > >>> >> >Kafka producer. You can use the collector that comes as part of > >>> >> >the process method. Take a look at one of the hello-samza > >>> >> >examples to see how this is done. (collector.send(...)) > >>> >> > > >>> >> >3. To parse the string, retrieve specific fields, etc, you > >>> >> >should write a second StreamTask that reads from the first. The > >>> >> >flow should look > >>> >>like: > >>> >> > > >>> >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2 > >>> >> > > >>> >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by > >>> >> >event ID, and "Job 2" parses and retrieves specific fields, and > >>> >> >produces to "Kafka topic 2". > >>> >> > > >>> >> >Cheers, > >>> >> >Chris > >>> >> > > >>> >> >On 3/18/14 2:48 PM, "[email protected]" > >>> >> ><[email protected]> wrote: > >>> >> > > >>> >> >>Hey Guys, > >>> >> >> > >>> >> >>So I'm writing my custom job in Samza and wanted to make sure > >>> >> >>I'm not re-inventing the wheel. > >>> >> >> > >>> >> >>I have a kafka job running that reads from a csv file and > >>> >> >>writes to a topic. I wrote this using the kafka producer api > >>> >> >>independent of > >>> Samza. > >>> >> >>The output is a KeyedMessage with key being my eventId and the > >>> >> >>value is a string corresponding to my event. > >>> >> >> > >>> >> >>Now, I want to write a SamzaConsumer that listens on my topic, > >>> >> >>parses the string to retrieve specific fields I'm interested in > >>> >> >>and write it out to a different kafka topic. > >>> >> >> > >>> >> >>Are there existing classes I can leverage to do this? > >>> >> >> > >>> >> >>Thanks, > >>> >> >>Sonali > >>> >> >> > >>> >> >>Sonali Parthasarathy > >>> >> >>R&D Developer, Data Insights > >>> >> >>Accenture Technology Labs > >>> >> >>703-341-7432 > >>> >> >> > >>> >> >> > >>> >> >>________________________________ > >>> >> >> > >>> >> >>This message is for the designated recipient only and may > >>> >> >>contain privileged, proprietary, or otherwise confidential > >>>information. > >>> >> >>If you have received it in error, please notify the sender > >>> >> >>immediately and delete the original. Any other use of the > >>> >> >>e-mail by you is > >>> prohibited. > >>> >> >>Where allowed by local law, electronic communications with > >>> >> >>Accenture and its affiliates, including e-mail and instant > >>> >> >>messaging (including content), may be scanned by our systems > >>> >> >>for the purposes of information security and assessment of > >>> >> >>internal compliance with > >>> >> Accenture policy. > >>> >> >>_______________________________________________________________ > >>> >> >>_ > >>> >> >>_ > >>> >> >>__ > >>> >> >>___ > >>> >> >>_ > >>> >> >>___ > >>> >> >>____________ > >>> >> >> > >>> >> >>www.accenture.com > >>> >> > > >>> >> > > >>> >> > > >>> >> >________________________________ > >>> >> > > >>> >> >This message is for the designated recipient only and may > >>> >> >contain privileged, proprietary, or otherwise confidential > information. > >>> >> >If you have received it in error, please notify the sender > >>> >> >immediately and delete the original. Any other use of the e-mail > >>> >> >by you is > >>> prohibited. > >>> >> >Where allowed by local law, electronic communications with > >>> >> >Accenture and its affiliates, including e-mail and instant > >>> >> >messaging (including content), may be scanned by our systems for > >>> >> >the purposes of information security and assessment of internal > >>> >> >compliance with > >>> Accenture policy. > >>> >> >________________________________________________________________ > >>> >> >_ > >>> >> >_ > >>> >> >__ > >>> >> >___ > >>> >> >___ > >>> >> >____________ > >>> >> > > >>> >> >www.accenture.com > >>> >> > > >>> >> > > >>> >> >----- > >>> >> >No virus found in this message. > >>> >> >Checked by AVG - www.avg.com > >>> >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date: > >>> >> >03/18/14 > >>> >> > > >>> >> > > >>> >> > >>> >> > >>> >> > >>> >> ________________________________ > >>> >> > >>> >> This message is for the designated recipient only and may contain > >>> >>privileged, proprietary, or otherwise confidential information. If > >>> >>you have received it in error, please notify the sender > >>> >>immediately and delete the original. Any other use of the e-mail > >>> >>by you is prohibited. Where allowed by local law, electronic > >>> >>communications with Accenture and its affiliates, including > >>> >>e-mail and instant messaging (including content), may be scanned > >>> >>by our systems for the purposes of information security and > >>> >>assessment of internal compliance with Accenture policy. > >>> >> > >>> >> > >>> >>__________________________________________________________________ > >>> >>_ > >>> >>_ > >>> >>__ > >>> >>___ > >>> >>_____________ > >>> >> > >>> >> www.accenture.com > >>> >> > >>> >> > >>> > >>> > >>> > >>> ________________________________ > >>> > >>> This message is for the designated recipient only and may contain > >>>privileged, proprietary, or otherwise confidential information. If > >>>you have received it in error, please notify the sender immediately > >>>and delete the original. Any other use of the e-mail by you is > prohibited. > >>> Where allowed by local law, electronic communications with Accenture > >>>and its affiliates, including e-mail and instant messaging (including > >>>content), may be scanned by our systems for the purposes of > >>>information security and assessment of internal compliance with > >>>Accenture policy. > >>> > >>> ____________________________________________________________________ > >>> _ > >>> _ > >>> ________________ > >>> > >>> www.accenture.com > >>> > >>> > >> > >>________________________________ > >> > >>This message is for the designated recipient only and may contain > >>privileged, proprietary, or otherwise confidential information. If you > >>have received it in error, please notify the sender immediately and > >>delete the original. Any other use of the e-mail by you is prohibited. > >>Where allowed by local law, electronic communications with Accenture > >>and its affiliates, including e-mail and instant messaging (including > >>content), may be scanned by our systems for the purposes of > >>information security and assessment of internal compliance with > Accenture policy. > >>______________________________________________________________________ > >>_ > >>___ > >>____________ > >> > >>www.accenture.com > > > > > > > >________________________________ > > > >This message is for the designated recipient only and may contain > >privileged, proprietary, or otherwise confidential information. If you > >have received it in error, please notify the sender immediately and > >delete the original. Any other use of the e-mail by you is prohibited. > >Where allowed by local law, electronic communications with Accenture > >and its affiliates, including e-mail and instant messaging (including > >content), may be scanned by our systems for the purposes of information > >security and assessment of internal compliance with Accenture policy. > >_______________________________________________________________________ > >___ > >____________ > > > >www.accenture.com > > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Where allowed > by local law, electronic communications with Accenture and its affiliates, > including e-mail and instant messaging (including content), may be scanned > by our systems for the purposes of information security and assessment of > internal compliance with Accenture policy. > > ______________________________________________________________________________________ > > www.accenture.com > >
