Hey Sonali,

In such a case, you probably just want to do stream-level overrides.
First, add the JSON serde to your config:

  
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFacto
ry


And then forcibly specifying the serde for your output topic(s):

  systems.kafka.streams.<your output stream>.samza.msg.serde=json

Cheers,
Chris

On 3/19/14 3:21 PM, "[email protected]"
<[email protected]> wrote:

>One quick question,
>
>systems.kafka.samza.msg.serde=string, is this property for the output aka
>the new kafka topic I'm trying to produce on?
>
>What happens, if the input message is a String and the output is a JSON?
>
>Thanks,
>Sonali
>
>-----Original Message-----
>From: Yan Fang [mailto:[email protected]]
>Sent: Wednesday, March 19, 2014 2:31 PM
>To: [email protected]
>Subject: Re: Writing my Custom Job
>
>make sure systems.kafka.samza.msg.serde=string
>
>
>Fang, Yan
>[email protected]
>+1 (206) 849-4108
>
>
>On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]>
>wrote:
>
>> Hmm, so I read the input which is a String and want to write to the
>> output topic also as a string. So I used the StringSerdeFactory. But
>> my samza complains and wants the JsonSerdefactory.
>> Exception in thread "main" org.apache.samza.SamzaException: Serde json
>> for system kafka does not exist in configuration.
>>         at
>> 
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$
>>anonfun$16.apply(SamzaContainer.scala:185)
>>         at
>> 
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$
>>anonfun$16.apply(SamzaContainer.scala:185)
>>         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>>         at
>> 
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.a
>>pply(SamzaContainer.scala:185)
>>         at
>> 
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.a
>>pply(SamzaContainer.scala:183)
>>         at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>>         at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>         at
>> 
>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scal
>>a:47)
>>         at scala.collection.SetLike$class.map(SetLike.scala:93)
>>         at scala.collection.AbstractSet.map(Set.scala:47)
>>         at
>> 
>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContaine
>>r.scala:183)
>>         at
>> 
>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContaine
>>r.scala:180)
>>         at
>> 
>>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:207
>>)
>>         at
>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
>>         at
>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>
>> Why is that?
>>
>> -----Original Message-----
>> From: Chris Riccomini [mailto:[email protected]]
>> Sent: Wednesday, March 19, 2014 12:40 PM
>> To: [email protected]
>> Subject: Re: Writing my Custom Job
>>
>> Hey Guys,
>>
>> This is a transitive dependency from Kafka. My guess is that the Kafka
>> pom files published to maven are not quite working properly. We've
>> already had some issues with them. Yan's suggestion is an appropriate
>> fix. The only thing I'd add is that you can make them a runtime
>>dependency.
>>
>>       <scope>runtime</scope>
>>
>>
>> Cheers,
>> Chris
>>
>> On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote:
>>
>> >Hi Sonali,
>> >
>> >I am not sure how Chris will response. My quick fix for this problem
>> >was simple adding the
>> >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics
>> >-c
>> >ore
>> ></artifactId><version>2.2.0</version></dependency>*
>> >to the pom.xml file when compile...
>> >
>> >Hope that will help.
>> >
>> >Thanks,
>> >
>> >Fang, Yan
>> >[email protected]
>> >+1 (206) 849-4108
>> >
>> >
>> >On Wed, Mar 19, 2014 at 11:23 AM,
>> ><[email protected]>wrote:
>> >
>> >> So I downloaded and built the latest samza code. I get this error:
>> >> Exception in thread "main" java.lang.NoClassDefFoundError:
>> >> com/yammer/metrics/Metrics
>> >>         at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown
>> >>Source)
>> >>         at kafka.producer.ProducerRequestMetrics.newTimer(Unknown
>> >>Source)
>> >>         at kafka.producer.ProducerRequestMetrics.<init>(Unknown
>>Source)
>> >>         at kafka.producer.ProducerRequestStats.<init>(Unknown Source)
>> >>         at
>> >>
>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown
>> >>Source)
>> >>         at
>> >>
>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown
>> >>Source)
>> >>         at kafka.utils.Pool.getAndMaybePut(Unknown Source)
>> >>         at
>> >>
>> >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStats
>> >>(U
>> >>nkn
>> >>own
>> >> Source)
>> >>         at kafka.producer.SyncProducer.<init>(Unknown Source)
>> >>         at kafka.producer.ProducerPool$.createSyncProducer(Unknown
>> >>Source)
>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
>>Source)
>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
>>Source)
>> >>         at
>> >>
>> >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Clie
>> >>nt
>> >>Uti
>> >>lTopicMetadataStore.scala:40)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(Kafk
>> >>aS
>> >>yst
>> >>emAdmin.scala:208)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kafk
>> >>aS
>> >>yst
>> >>emAdmin.scala:149)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kafk
>> >>aS
>> >>yst
>> >>emAdmin.scala:149)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(T
>> >>op
>> >>icM
>> >>etadataCache.scala:54)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetada
>> >>ta
>> >>(Ka
>> >>fkaSystemAdmin.scala:146)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetada
>> >>ta
>> >>(Ka
>> >>fkaSystemAdmin.scala:125)
>> >>         at
>> >>
>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply
>> >>(U
>> >>til
>> >>.scala:98)
>> >>         at
>> >>
>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply
>> >>(U
>> >>til
>> >>.scala:86)
>> >>         at
>> >>
>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversabl
>> >>eL
>> >>ike
>> >>.scala:251)
>> >>         at
>> >>
>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversabl
>> >>eL
>> >>ike
>> >>.scala:251)
>> >>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>> >>         at
>> >>
>> 
>>>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:25
>>>>1)
>> >>         at
>> >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> >>         at
>> >> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86)
>> >>         at
>> >>
>> >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppM
>> >>as
>> >>ter
>> >>TaskManager.scala:79)
>> >>         at
>> >> 
>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78)
>> >>         at
>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>> >> Caused by: java.lang.ClassNotFoundException:
>>com.yammer.metrics.Metrics
>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> >>         at java.security.AccessController.doPrivileged(Native Method)
>> >>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> >>         at 
>>sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> >>         ... 30 more
>> >>
>> >> -----Original Message-----
>> >> From: Chris Riccomini [mailto:[email protected]]
>> >> Sent: Tuesday, March 18, 2014 8:06 PM
>> >> To: [email protected]
>> >> Subject: Re: Writing my Custom Job
>> >>
>> >> Hey Sonali,
>> >>
>> >> Line 65 currently looks like:
>> >>
>> >>   val amClient = new AMRMClientImpl[ContainerRequest]
>> >>
>> >> There is no call to Option().get, which is what would trigger this
>> >> exception. Are you running the latest Samza code from the master
>>branch?
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 3/18/14 5:11 PM, "[email protected]"
>> >> <[email protected]> wrote:
>> >>
>> >> >Has this happened to anyone before?
>> >> >Exception in thread "main" java.util.NoSuchElementException:
>>None.get
>> >> >       at scala.None$.get(Option.scala:313)
>> >> >       at scala.None$.get(Option.scala:311)
>> >> >       at
>> >> 
>>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:65)
>> >> >       at
>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>> >> >
>> >> >-----Original Message-----
>> >> >From: Garry Turkington [mailto:[email protected]]
>> >> >Sent: Tuesday, March 18, 2014 4:16 PM
>> >> >To: [email protected]
>> >> >Subject: RE: Writing my Custom Job
>> >> >
>> >> >Hi,
>> >> >
>> >> >1. Specifically, since I'm using kafka I don't have to write
>> >> >Consumer and Systemfactory Classes? Correct?
>> >> >Correct
>> >> >
>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
>> >> >    String event = (String)envelope.getMessage();
>> >> >
>> >> >Correct -- just make sure you have the serdes on the system/stream
>> >> >configured as string.
>> >> >
>> >> >Garry
>> >> >
>> >> >-----Original Message-----
>> >> >From: [email protected]
>> >> >[mailto:[email protected]]
>> >> >Sent: 18 March 2014 22:57
>> >> >To: [email protected]
>> >> >Subject: RE: Writing my Custom Job
>> >> >
>> >> >Hey Chris,
>> >> >
>> >> >Thanks for the quick response!
>> >> >
>> >> >So the thing is, we have the kafka producer independent of Samza.
>> >> >The idea is to test the kafka streams with different CEPs. So one
>> >> >example would be Storm. That's why I have a separate kafka job
>> >> >running that reads from a file and writes to a kafka topic.
>> >> >
>> >> >So assuming there is a topic say "input-topic" (where the message
>> >> >is some event of type "String" and key is the actual eventId of
>> >> >the
>> >> >event) already in place, I want to write a SamzaStreamTask that
>> >> >will read this string, parse it and write to another kafka topic.
>> >> >In other words, Job1 is already done independent of Samza. I'm
>> >> >working on
>> Job2 using Samza.
>> >> >
>> >> >1. Specifically, since I'm using kafka I don't have to write
>> >> >Consumer and Systemfactory Classes? Correct?
>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
>> >> >    String event = (String)envelope.getMessage();
>> >> >
>> >> >Thanks!
>> >> >Sonali
>> >> >
>> >> >-----Original Message-----
>> >> >From: Chris Riccomini [mailto:[email protected]]
>> >> >Sent: Tuesday, March 18, 2014 3:16 PM
>> >> >To: [email protected]
>> >> >Subject: Re: Writing my Custom Job
>> >> >
>> >> >Hey Sonali,
>> >> >
>> >> >1. For CSV file reading, you should check this JIRA out:
>> >> >
>> >> >  https://issues.apache.org/jira/browse/SAMZA-138
>> >> >
>> >> >2. You don't need to write to a Kafka topic using the standard
>> >> >Kafka producer. You can use the collector that comes as part of
>> >> >the process method. Take a look at one of the hello-samza examples
>> >> >to see how this is done. (collector.send(...))
>> >> >
>> >> >3. To parse the string, retrieve specific fields, etc, you should
>> >> >write a second StreamTask that reads from the first. The flow
>> >> >should look
>> >>like:
>> >> >
>> >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2
>> >> >
>> >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by
>> >> >event ID, and "Job 2" parses and retrieves specific fields, and
>> >> >produces to "Kafka topic 2".
>> >> >
>> >> >Cheers,
>> >> >Chris
>> >> >
>> >> >On 3/18/14 2:48 PM, "[email protected]"
>> >> ><[email protected]> wrote:
>> >> >
>> >> >>Hey Guys,
>> >> >>
>> >> >>So I'm writing my custom job in Samza and wanted to make sure I'm
>> >> >>not re-inventing the wheel.
>> >> >>
>> >> >>I have a kafka job running that reads from a csv file and writes
>> >> >>to a topic. I wrote this using the kafka producer api independent
>> >> >>of
>> Samza.
>> >> >>The output is a KeyedMessage with key being my eventId and the
>> >> >>value is a string corresponding to my event.
>> >> >>
>> >> >>Now, I want to write a SamzaConsumer that listens on my topic,
>> >> >>parses the string to retrieve specific fields I'm interested in
>> >> >>and write it out to a different kafka topic.
>> >> >>
>> >> >>Are there existing classes I can leverage to do this?
>> >> >>
>> >> >>Thanks,
>> >> >>Sonali
>> >> >>
>> >> >>Sonali Parthasarathy
>> >> >>R&D Developer, Data Insights
>> >> >>Accenture Technology Labs
>> >> >>703-341-7432
>> >> >>
>> >> >>
>> >> >>________________________________
>> >> >>
>> >> >>This message is for the designated recipient only and may contain
>> >> >>privileged, proprietary, or otherwise confidential information.
>> >> >>If you have received it in error, please notify the sender
>> >> >>immediately and delete the original. Any other use of the e-mail
>> >> >>by you is
>> prohibited.
>> >> >>Where allowed by local law, electronic communications with
>> >> >>Accenture and its affiliates, including e-mail and instant
>> >> >>messaging (including content), may be scanned by our systems for
>> >> >>the purposes of information security and assessment of internal
>> >> >>compliance with
>> >> Accenture policy.
>> >> >>_________________________________________________________________
>> >> >>__
>> >> >>___
>> >> >>_
>> >> >>___
>> >> >>____________
>> >> >>
>> >> >>www.accenture.com
>> >> >
>> >> >
>> >> >
>> >> >________________________________
>> >> >
>> >> >This message is for the designated recipient only and may contain
>> >> >privileged, proprietary, or otherwise confidential information. If
>> >> >you have received it in error, please notify the sender
>> >> >immediately and delete the original. Any other use of the e-mail
>> >> >by you is
>> prohibited.
>> >> >Where allowed by local law, electronic communications with
>> >> >Accenture and its affiliates, including e-mail and instant
>> >> >messaging (including content), may be scanned by our systems for
>> >> >the purposes of information security and assessment of internal
>> >> >compliance with
>> Accenture policy.
>> >> >__________________________________________________________________
>> >> >__
>> >> >___
>> >> >___
>> >> >____________
>> >> >
>> >> >www.accenture.com
>> >> >
>> >> >
>> >> >-----
>> >> >No virus found in this message.
>> >> >Checked by AVG - www.avg.com
>> >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date:
>> >> >03/18/14
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >> ________________________________
>> >>
>> >> This message is for the designated recipient only and may contain
>> >>privileged, proprietary, or otherwise confidential information. If
>> >>you have  received it in error, please notify the sender immediately
>> >>and delete the  original. Any other use of the e-mail by you is
>> >>prohibited. Where allowed  by local law, electronic communications
>> >>with Accenture and its affiliates,  including e-mail and instant
>> >>messaging (including content), may be scanned  by our systems for
>> >>the purposes of information security and assessment of  internal
>> >>compliance with Accenture policy.
>> >>
>> >>
>> >>____________________________________________________________________
>> >>__
>> >>___
>> >>_____________
>> >>
>> >> www.accenture.com
>> >>
>> >>
>>
>>
>>
>> ________________________________
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you
>> have received it in error, please notify the sender immediately and
>> delete the original. Any other use of the e-mail by you is prohibited.
>> Where allowed by local law, electronic communications with Accenture
>> and its affiliates, including e-mail and instant messaging (including
>> content), may be scanned by our systems for the purposes of
>> information security and assessment of internal compliance with
>>Accenture policy.
>>
>> ______________________________________________________________________
>> ________________
>>
>> www.accenture.com
>>
>>
>
>________________________________
>
>This message is for the designated recipient only and may contain
>privileged, proprietary, or otherwise confidential information. If you
>have received it in error, please notify the sender immediately and
>delete the original. Any other use of the e-mail by you is prohibited.
>Where allowed by local law, electronic communications with Accenture and
>its affiliates, including e-mail and instant messaging (including
>content), may be scanned by our systems for the purposes of information
>security and assessment of internal compliance with Accenture policy.
>__________________________________________________________________________
>____________
>
>www.accenture.com

Reply via email to