Ah I see.  This might be a silly question, but where can I find the 
output_stream name?

-----Original Message-----
From: Chris Riccomini [mailto:[email protected]]
Sent: Wednesday, March 19, 2014 3:29 PM
To: [email protected]
Subject: Re: Writing my Custom Job

Hey Sonali,

In such a case, you probably just want to do stream-level overrides.
First, add the JSON serde to your config:


serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFacto
ry


And then forcibly specifying the serde for your output topic(s):

  systems.kafka.streams.<your output stream>.samza.msg.serde=json

Cheers,
Chris

On 3/19/14 3:21 PM, "[email protected]"
<[email protected]> wrote:

>One quick question,
>
>systems.kafka.samza.msg.serde=string, is this property for the output
>aka the new kafka topic I'm trying to produce on?
>
>What happens, if the input message is a String and the output is a JSON?
>
>Thanks,
>Sonali
>
>-----Original Message-----
>From: Yan Fang [mailto:[email protected]]
>Sent: Wednesday, March 19, 2014 2:31 PM
>To: [email protected]
>Subject: Re: Writing my Custom Job
>
>make sure systems.kafka.samza.msg.serde=string
>
>
>Fang, Yan
>[email protected]
>+1 (206) 849-4108
>
>
>On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]>
>wrote:
>
>> Hmm, so I read the input which is a String and want to write to the
>> output topic also as a string. So I used the StringSerdeFactory. But
>> my samza complains and wants the JsonSerdefactory.
>> Exception in thread "main" org.apache.samza.SamzaException: Serde
>> json for system kafka does not exist in configuration.
>>         at
>>
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>7$$
>>anonfun$16.apply(SamzaContainer.scala:185)
>>         at
>>
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>7$$
>>anonfun$16.apply(SamzaContainer.scala:185)
>>         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>>         at
>>
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>7.a
>>pply(SamzaContainer.scala:185)
>>         at
>>
>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>7.a
>>pply(SamzaContainer.scala:183)
>>         at
>>
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>>sca
>>la:244)
>>         at
>>
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>>sca
>>la:244)
>>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>         at
>>
>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
>>cal
>>a:47)
>>         at scala.collection.SetLike$class.map(SetLike.scala:93)
>>         at scala.collection.AbstractSet.map(Set.scala:47)
>>         at
>>
>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaConta
>>ine
>>r.scala:183)
>>         at
>>
>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaConta
>>ine
>>r.scala:180)
>>         at
>>
>>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:
>>207
>>)
>>         at
>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
>>         at
>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>
>> Why is that?
>>
>> -----Original Message-----
>> From: Chris Riccomini [mailto:[email protected]]
>> Sent: Wednesday, March 19, 2014 12:40 PM
>> To: [email protected]
>> Subject: Re: Writing my Custom Job
>>
>> Hey Guys,
>>
>> This is a transitive dependency from Kafka. My guess is that the
>>Kafka  pom files published to maven are not quite working properly.
>>We've  already had some issues with them. Yan's suggestion is an
>>appropriate  fix. The only thing I'd add is that you can make them a
>>runtime dependency.
>>
>>       <scope>runtime</scope>
>>
>>
>> Cheers,
>> Chris
>>
>> On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote:
>>
>> >Hi Sonali,
>> >
>> >I am not sure how Chris will response. My quick fix for this problem
>> >was simple adding the
>> >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metric
>> >s
>> >-c
>> >ore
>> ></artifactId><version>2.2.0</version></dependency>*
>> >to the pom.xml file when compile...
>> >
>> >Hope that will help.
>> >
>> >Thanks,
>> >
>> >Fang, Yan
>> >[email protected]
>> >+1 (206) 849-4108
>> >
>> >
>> >On Wed, Mar 19, 2014 at 11:23 AM,
>> ><[email protected]>wrote:
>> >
>> >> So I downloaded and built the latest samza code. I get this error:
>> >> Exception in thread "main" java.lang.NoClassDefFoundError:
>> >> com/yammer/metrics/Metrics
>> >>         at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown
>> >>Source)
>> >>         at kafka.producer.ProducerRequestMetrics.newTimer(Unknown
>> >>Source)
>> >>         at kafka.producer.ProducerRequestMetrics.<init>(Unknown
>>Source)
>> >>         at kafka.producer.ProducerRequestStats.<init>(Unknown Source)
>> >>         at
>> >>
>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknow
>> >>n
>> >>Source)
>> >>         at
>> >>
>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknow
>> >>n
>> >>Source)
>> >>         at kafka.utils.Pool.getAndMaybePut(Unknown Source)
>> >>         at
>> >>
>> >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStat
>> >>s
>> >>(U
>> >>nkn
>> >>own
>> >> Source)
>> >>         at kafka.producer.SyncProducer.<init>(Unknown Source)
>> >>         at kafka.producer.ProducerPool$.createSyncProducer(Unknown
>> >>Source)
>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
>>Source)
>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
>>Source)
>> >>         at
>> >>
>> >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Cli
>> >>e
>> >>nt
>> >>Uti
>> >>lTopicMetadataStore.scala:40)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(Kaf
>> >>k
>> >>aS
>> >>yst
>> >>emAdmin.scala:208)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kaf
>> >>k
>> >>aS
>> >>yst
>> >>emAdmin.scala:149)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kaf
>> >>k
>> >>aS
>> >>yst
>> >>emAdmin.scala:149)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(
>> >>T
>> >>op
>> >>icM
>> >>etadataCache.scala:54)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetad
>> >>a
>> >>ta
>> >>(Ka
>> >>fkaSystemAdmin.scala:146)
>> >>         at
>> >>
>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetad
>> >>a
>> >>ta
>> >>(Ka
>> >>fkaSystemAdmin.scala:125)
>> >>         at
>> >>
>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.appl
>> >>y
>> >>(U
>> >>til
>> >>.scala:98)
>> >>         at
>> >>
>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.appl
>> >>y
>> >>(U
>> >>til
>> >>.scala:86)
>> >>         at
>> >>
>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversab
>> >>l
>> >>eL
>> >>ike
>> >>.scala:251)
>> >>         at
>> >>
>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversab
>> >>l
>> >>eL
>> >>ike
>> >>.scala:251)
>> >>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>> >>         at
>> >>
>>
>>>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala
>>>>:25
>>>>1)
>> >>         at
>> >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> >>         at
>> >> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86)
>> >>         at
>> >>
>> >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaApp
>> >>M
>> >>as
>> >>ter
>> >>TaskManager.scala:79)
>> >>         at
>> >>
>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78
>>)
>> >>         at
>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala
>> >> ) Caused by: java.lang.ClassNotFoundException:
>>com.yammer.metrics.Metrics
>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> >>         at java.security.AccessController.doPrivileged(Native Method)
>> >>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> >>         at
>>sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> >>         ... 30 more
>> >>
>> >> -----Original Message-----
>> >> From: Chris Riccomini [mailto:[email protected]]
>> >> Sent: Tuesday, March 18, 2014 8:06 PM
>> >> To: [email protected]
>> >> Subject: Re: Writing my Custom Job
>> >>
>> >> Hey Sonali,
>> >>
>> >> Line 65 currently looks like:
>> >>
>> >>   val amClient = new AMRMClientImpl[ContainerRequest]
>> >>
>> >> There is no call to Option().get, which is what would trigger this
>> >> exception. Are you running the latest Samza code from the master
>>branch?
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 3/18/14 5:11 PM, "[email protected]"
>> >> <[email protected]> wrote:
>> >>
>> >> >Has this happened to anyone before?
>> >> >Exception in thread "main" java.util.NoSuchElementException:
>>None.get
>> >> >       at scala.None$.get(Option.scala:313)
>> >> >       at scala.None$.get(Option.scala:311)
>> >> >       at
>> >>
>>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:6
>>>5)
>> >> >       at
>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala
>> >> )
>> >> >
>> >> >-----Original Message-----
>> >> >From: Garry Turkington [mailto:[email protected]]
>> >> >Sent: Tuesday, March 18, 2014 4:16 PM
>> >> >To: [email protected]
>> >> >Subject: RE: Writing my Custom Job
>> >> >
>> >> >Hi,
>> >> >
>> >> >1. Specifically, since I'm using kafka I don't have to write
>> >> >Consumer and Systemfactory Classes? Correct?
>> >> >Correct
>> >> >
>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
>> >> >    String event = (String)envelope.getMessage();
>> >> >
>> >> >Correct -- just make sure you have the serdes on the
>> >> >system/stream configured as string.
>> >> >
>> >> >Garry
>> >> >
>> >> >-----Original Message-----
>> >> >From: [email protected]
>> >> >[mailto:[email protected]]
>> >> >Sent: 18 March 2014 22:57
>> >> >To: [email protected]
>> >> >Subject: RE: Writing my Custom Job
>> >> >
>> >> >Hey Chris,
>> >> >
>> >> >Thanks for the quick response!
>> >> >
>> >> >So the thing is, we have the kafka producer independent of Samza.
>> >> >The idea is to test the kafka streams with different CEPs. So one
>> >> >example would be Storm. That's why I have a separate kafka job
>> >> >running that reads from a file and writes to a kafka topic.
>> >> >
>> >> >So assuming there is a topic say "input-topic" (where the message
>> >> >is some event of type "String" and key is the actual eventId of
>> >> >the
>> >> >event) already in place, I want to write a SamzaStreamTask that
>> >> >will read this string, parse it and write to another kafka topic.
>> >> >In other words, Job1 is already done independent of Samza. I'm
>> >> >working on
>> Job2 using Samza.
>> >> >
>> >> >1. Specifically, since I'm using kafka I don't have to write
>> >> >Consumer and Systemfactory Classes? Correct?
>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
>> >> >    String event = (String)envelope.getMessage();
>> >> >
>> >> >Thanks!
>> >> >Sonali
>> >> >
>> >> >-----Original Message-----
>> >> >From: Chris Riccomini [mailto:[email protected]]
>> >> >Sent: Tuesday, March 18, 2014 3:16 PM
>> >> >To: [email protected]
>> >> >Subject: Re: Writing my Custom Job
>> >> >
>> >> >Hey Sonali,
>> >> >
>> >> >1. For CSV file reading, you should check this JIRA out:
>> >> >
>> >> >  https://issues.apache.org/jira/browse/SAMZA-138
>> >> >
>> >> >2. You don't need to write to a Kafka topic using the standard
>> >> >Kafka producer. You can use the collector that comes as part of
>> >> >the process method. Take a look at one of the hello-samza
>> >> >examples to see how this is done. (collector.send(...))
>> >> >
>> >> >3. To parse the string, retrieve specific fields, etc, you should
>> >> >write a second StreamTask that reads from the first. The flow
>> >> >should look
>> >>like:
>> >> >
>> >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2
>> >> >
>> >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by
>> >> >event ID, and "Job 2" parses and retrieves specific fields, and
>> >> >produces to "Kafka topic 2".
>> >> >
>> >> >Cheers,
>> >> >Chris
>> >> >
>> >> >On 3/18/14 2:48 PM, "[email protected]"
>> >> ><[email protected]> wrote:
>> >> >
>> >> >>Hey Guys,
>> >> >>
>> >> >>So I'm writing my custom job in Samza and wanted to make sure
>> >> >>I'm not re-inventing the wheel.
>> >> >>
>> >> >>I have a kafka job running that reads from a csv file and writes
>> >> >>to a topic. I wrote this using the kafka producer api
>> >> >>independent of
>> Samza.
>> >> >>The output is a KeyedMessage with key being my eventId and the
>> >> >>value is a string corresponding to my event.
>> >> >>
>> >> >>Now, I want to write a SamzaConsumer that listens on my topic,
>> >> >>parses the string to retrieve specific fields I'm interested in
>> >> >>and write it out to a different kafka topic.
>> >> >>
>> >> >>Are there existing classes I can leverage to do this?
>> >> >>
>> >> >>Thanks,
>> >> >>Sonali
>> >> >>
>> >> >>Sonali Parthasarathy
>> >> >>R&D Developer, Data Insights
>> >> >>Accenture Technology Labs
>> >> >>703-341-7432
>> >> >>
>> >> >>
>> >> >>________________________________
>> >> >>
>> >> >>This message is for the designated recipient only and may
>> >> >>contain privileged, proprietary, or otherwise confidential information.
>> >> >>If you have received it in error, please notify the sender
>> >> >>immediately and delete the original. Any other use of the e-mail
>> >> >>by you is
>> prohibited.
>> >> >>Where allowed by local law, electronic communications with
>> >> >>Accenture and its affiliates, including e-mail and instant
>> >> >>messaging (including content), may be scanned by our systems for
>> >> >>the purposes of information security and assessment of internal
>> >> >>compliance with
>> >> Accenture policy.
>> >> >>________________________________________________________________
>> >> >>_
>> >> >>__
>> >> >>___
>> >> >>_
>> >> >>___
>> >> >>____________
>> >> >>
>> >> >>www.accenture.com
>> >> >
>> >> >
>> >> >
>> >> >________________________________
>> >> >
>> >> >This message is for the designated recipient only and may contain
>> >> >privileged, proprietary, or otherwise confidential information.
>> >> >If you have received it in error, please notify the sender
>> >> >immediately and delete the original. Any other use of the e-mail
>> >> >by you is
>> prohibited.
>> >> >Where allowed by local law, electronic communications with
>> >> >Accenture and its affiliates, including e-mail and instant
>> >> >messaging (including content), may be scanned by our systems for
>> >> >the purposes of information security and assessment of internal
>> >> >compliance with
>> Accenture policy.
>> >> >_________________________________________________________________
>> >> >_
>> >> >__
>> >> >___
>> >> >___
>> >> >____________
>> >> >
>> >> >www.accenture.com
>> >> >
>> >> >
>> >> >-----
>> >> >No virus found in this message.
>> >> >Checked by AVG - www.avg.com
>> >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date:
>> >> >03/18/14
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >> ________________________________
>> >>
>> >> This message is for the designated recipient only and may contain
>> >>privileged, proprietary, or otherwise confidential information. If
>> >>you have  received it in error, please notify the sender
>> >>immediately and delete the  original. Any other use of the e-mail
>> >>by you is prohibited. Where allowed  by local law, electronic
>> >>communications with Accenture and its affiliates,  including e-mail
>> >>and instant messaging (including content), may be scanned  by our
>> >>systems for the purposes of information security and assessment of
>> >>internal compliance with Accenture policy.
>> >>
>> >>
>> >>___________________________________________________________________
>> >>_
>> >>__
>> >>___
>> >>_____________
>> >>
>> >> www.accenture.com
>> >>
>> >>
>>
>>
>>
>> ________________________________
>>
>> This message is for the designated recipient only and may contain
>>privileged, proprietary, or otherwise confidential information. If you
>>have received it in error, please notify the sender immediately and
>>delete the original. Any other use of the e-mail by you is prohibited.
>> Where allowed by local law, electronic communications with Accenture
>>and its affiliates, including e-mail and instant messaging (including
>>content), may be scanned by our systems for the purposes of
>>information security and assessment of internal compliance with
>>Accenture policy.
>>
>> _____________________________________________________________________
>> _
>> ________________
>>
>> www.accenture.com
>>
>>
>
>________________________________
>
>This message is for the designated recipient only and may contain
>privileged, proprietary, or otherwise confidential information. If you
>have received it in error, please notify the sender immediately and
>delete the original. Any other use of the e-mail by you is prohibited.
>Where allowed by local law, electronic communications with Accenture
>and its affiliates, including e-mail and instant messaging (including
>content), may be scanned by our systems for the purposes of information
>security and assessment of internal compliance with Accenture policy.
>_______________________________________________________________________
>___
>____________
>
>www.accenture.com



________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
______________________________________________________________________________________

www.accenture.com

Reply via email to