Hmm, so I read the input which is a String and want to write to the output
topic also as a string. So I used the StringSerdeFactory. But my samza
complains and wants the JsonSerdefactory.
Exception in thread "main" org.apache.samza.SamzaException: Serde json for
system kafka does not exist in configuration.
at
org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$anonfun$16.apply(SamzaContainer.scala:185)
at
org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$anonfun$16.apply(SamzaContainer.scala:185)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at
org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.apply(SamzaContainer.scala:185)
at
org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.apply(SamzaContainer.scala:183)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at
org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContainer.scala:183)
at
org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContainer.scala:180)
at
org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:207)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Why is that?
-----Original Message-----
From: Chris Riccomini [mailto:[email protected]]
Sent: Wednesday, March 19, 2014 12:40 PM
To: [email protected]
Subject: Re: Writing my Custom Job
Hey Guys,
This is a transitive dependency from Kafka. My guess is that the Kafka pom
files published to maven are not quite working properly. We've already had some
issues with them. Yan's suggestion is an appropriate fix. The only thing I'd
add is that you can make them a runtime dependency.
<scope>runtime</scope>
Cheers,
Chris
On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote:
>Hi Sonali,
>
>I am not sure how Chris will response. My quick fix for this problem
>was simple adding the
>*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics-c
>ore
></artifactId><version>2.2.0</version></dependency>*
>to the pom.xml file when compile...
>
>Hope that will help.
>
>Thanks,
>
>Fang, Yan
>[email protected]
>+1 (206) 849-4108
>
>
>On Wed, Mar 19, 2014 at 11:23 AM,
><[email protected]>wrote:
>
>> So I downloaded and built the latest samza code. I get this error:
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> com/yammer/metrics/Metrics
>> at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown
>>Source)
>> at kafka.producer.ProducerRequestMetrics.newTimer(Unknown
>>Source)
>> at kafka.producer.ProducerRequestMetrics.<init>(Unknown Source)
>> at kafka.producer.ProducerRequestStats.<init>(Unknown Source)
>> at
>> kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown
>>Source)
>> at
>> kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown
>>Source)
>> at kafka.utils.Pool.getAndMaybePut(Unknown Source)
>> at
>>
>>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStats(U
>>nkn
>>own
>> Source)
>> at kafka.producer.SyncProducer.<init>(Unknown Source)
>> at kafka.producer.ProducerPool$.createSyncProducer(Unknown
>>Source)
>> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source)
>> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source)
>> at
>>
>>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Client
>>Uti
>>lTopicMetadataStore.scala:40)
>> at
>>
>>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaS
>>yst
>>emAdmin.scala:208)
>> at
>>
>>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaS
>>yst
>>emAdmin.scala:149)
>> at
>>
>>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaS
>>yst
>>emAdmin.scala:149)
>> at
>>
>>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(Top
>>icM
>>etadataCache.scala:54)
>> at
>>
>>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata
>>(Ka
>>fkaSystemAdmin.scala:146)
>> at
>>
>>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata
>>(Ka
>>fkaSystemAdmin.scala:125)
>> at
>>
>>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(U
>>til
>>.scala:98)
>> at
>>
>>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(U
>>til
>>.scala:86)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableL
>>ike
>>.scala:251)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableL
>>ike
>>.scala:251)
>> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>> at
>>
>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86)
>> at
>>
>>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMas
>>ter
>>TaskManager.scala:79)
>> at
>> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78)
>> at
>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> ... 30 more
>>
>> -----Original Message-----
>> From: Chris Riccomini [mailto:[email protected]]
>> Sent: Tuesday, March 18, 2014 8:06 PM
>> To: [email protected]
>> Subject: Re: Writing my Custom Job
>>
>> Hey Sonali,
>>
>> Line 65 currently looks like:
>>
>> val amClient = new AMRMClientImpl[ContainerRequest]
>>
>> There is no call to Option().get, which is what would trigger this
>> exception. Are you running the latest Samza code from the master branch?
>>
>> Cheers,
>> Chris
>>
>> On 3/18/14 5:11 PM, "[email protected]"
>> <[email protected]> wrote:
>>
>> >Has this happened to anyone before?
>> >Exception in thread "main" java.util.NoSuchElementException: None.get
>> > at scala.None$.get(Option.scala:313)
>> > at scala.None$.get(Option.scala:311)
>> > at
>> >org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:65)
>> > at
>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>> >
>> >-----Original Message-----
>> >From: Garry Turkington [mailto:[email protected]]
>> >Sent: Tuesday, March 18, 2014 4:16 PM
>> >To: [email protected]
>> >Subject: RE: Writing my Custom Job
>> >
>> >Hi,
>> >
>> >1. Specifically, since I'm using kafka I don't have to write
>> >Consumer and Systemfactory Classes? Correct?
>> >Correct
>> >
>> >2. For the SamzaStreamTask would the input be a String? i.e.
>> > String event = (String)envelope.getMessage();
>> >
>> >Correct -- just make sure you have the serdes on the system/stream
>> >configured as string.
>> >
>> >Garry
>> >
>> >-----Original Message-----
>> >From: [email protected]
>> >[mailto:[email protected]]
>> >Sent: 18 March 2014 22:57
>> >To: [email protected]
>> >Subject: RE: Writing my Custom Job
>> >
>> >Hey Chris,
>> >
>> >Thanks for the quick response!
>> >
>> >So the thing is, we have the kafka producer independent of Samza.
>> >The idea is to test the kafka streams with different CEPs. So one
>> >example would be Storm. That's why I have a separate kafka job
>> >running that reads from a file and writes to a kafka topic.
>> >
>> >So assuming there is a topic say "input-topic" (where the message is
>> >some event of type "String" and key is the actual eventId of the
>> >event) already in place, I want to write a SamzaStreamTask that will
>> >read this string, parse it and write to another kafka topic. In
>> >other words, Job1 is already done independent of Samza. I'm working on Job2
>> >using Samza.
>> >
>> >1. Specifically, since I'm using kafka I don't have to write
>> >Consumer and Systemfactory Classes? Correct?
>> >2. For the SamzaStreamTask would the input be a String? i.e.
>> > String event = (String)envelope.getMessage();
>> >
>> >Thanks!
>> >Sonali
>> >
>> >-----Original Message-----
>> >From: Chris Riccomini [mailto:[email protected]]
>> >Sent: Tuesday, March 18, 2014 3:16 PM
>> >To: [email protected]
>> >Subject: Re: Writing my Custom Job
>> >
>> >Hey Sonali,
>> >
>> >1. For CSV file reading, you should check this JIRA out:
>> >
>> > https://issues.apache.org/jira/browse/SAMZA-138
>> >
>> >2. You don't need to write to a Kafka topic using the standard Kafka
>> >producer. You can use the collector that comes as part of the
>> >process method. Take a look at one of the hello-samza examples to
>> >see how this is done. (collector.send(...))
>> >
>> >3. To parse the string, retrieve specific fields, etc, you should
>> >write a second StreamTask that reads from the first. The flow should
>> >look
>>like:
>> >
>> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2
>> >
>> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by event
>> >ID, and "Job 2" parses and retrieves specific fields, and produces
>> >to "Kafka topic 2".
>> >
>> >Cheers,
>> >Chris
>> >
>> >On 3/18/14 2:48 PM, "[email protected]"
>> ><[email protected]> wrote:
>> >
>> >>Hey Guys,
>> >>
>> >>So I'm writing my custom job in Samza and wanted to make sure I'm
>> >>not re-inventing the wheel.
>> >>
>> >>I have a kafka job running that reads from a csv file and writes to
>> >>a topic. I wrote this using the kafka producer api independent of Samza.
>> >>The output is a KeyedMessage with key being my eventId and the
>> >>value is a string corresponding to my event.
>> >>
>> >>Now, I want to write a SamzaConsumer that listens on my topic,
>> >>parses the string to retrieve specific fields I'm interested in and
>> >>write it out to a different kafka topic.
>> >>
>> >>Are there existing classes I can leverage to do this?
>> >>
>> >>Thanks,
>> >>Sonali
>> >>
>> >>Sonali Parthasarathy
>> >>R&D Developer, Data Insights
>> >>Accenture Technology Labs
>> >>703-341-7432
>> >>
>> >>
>> >>________________________________
>> >>
>> >>This message is for the designated recipient only and may contain
>> >>privileged, proprietary, or otherwise confidential information. If
>> >>you have received it in error, please notify the sender immediately
>> >>and delete the original. Any other use of the e-mail by you is prohibited.
>> >>Where allowed by local law, electronic communications with
>> >>Accenture and its affiliates, including e-mail and instant
>> >>messaging (including content), may be scanned by our systems for
>> >>the purposes of information security and assessment of internal
>> >>compliance with
>> Accenture policy.
>> >>___________________________________________________________________
>> >>___
>> >>_
>> >>___
>> >>____________
>> >>
>> >>www.accenture.com
>> >
>> >
>> >
>> >________________________________
>> >
>> >This message is for the designated recipient only and may contain
>> >privileged, proprietary, or otherwise confidential information. If
>> >you have received it in error, please notify the sender immediately
>> >and delete the original. Any other use of the e-mail by you is prohibited.
>> >Where allowed by local law, electronic communications with Accenture
>> >and its affiliates, including e-mail and instant messaging
>> >(including content), may be scanned by our systems for the purposes
>> >of information security and assessment of internal compliance with
>> >Accenture policy.
>> >____________________________________________________________________
>> >___
>> >___
>> >____________
>> >
>> >www.accenture.com
>> >
>> >
>> >-----
>> >No virus found in this message.
>> >Checked by AVG - www.avg.com
>> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date:
>> >03/18/14
>> >
>> >
>>
>>
>>
>> ________________________________
>>
>> This message is for the designated recipient only and may contain
>>privileged, proprietary, or otherwise confidential information. If you
>>have received it in error, please notify the sender immediately and
>>delete the original. Any other use of the e-mail by you is
>>prohibited. Where allowed by local law, electronic communications
>>with Accenture and its affiliates, including e-mail and instant
>>messaging (including content), may be scanned by our systems for the
>>purposes of information security and assessment of internal
>>compliance with Accenture policy.
>>
>>
>>______________________________________________________________________
>>___
>>_____________
>>
>> www.accenture.com
>>
>>
________________________________
This message is for the designated recipient only and may contain privileged,
proprietary, or otherwise confidential information. If you have received it in
error, please notify the sender immediately and delete the original. Any other
use of the e-mail by you is prohibited. Where allowed by local law, electronic
communications with Accenture and its affiliates, including e-mail and instant
messaging (including content), may be scanned by our systems for the purposes
of information security and assessment of internal compliance with Accenture
policy.
______________________________________________________________________________________
www.accenture.com