Hi Sonali, I am not sure how Chris will response. My quick fix for this problem was simple adding the *<dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics-core</artifactId><version>2.2.0</version></dependency>* to the pom.xml file when compile...
Hope that will help. Thanks, Fang, Yan [email protected] +1 (206) 849-4108 On Wed, Mar 19, 2014 at 11:23 AM, <[email protected]>wrote: > So I downloaded and built the latest samza code. I get this error: > Exception in thread "main" java.lang.NoClassDefFoundError: > com/yammer/metrics/Metrics > at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown Source) > at kafka.producer.ProducerRequestMetrics.newTimer(Unknown Source) > at kafka.producer.ProducerRequestMetrics.<init>(Unknown Source) > at kafka.producer.ProducerRequestStats.<init>(Unknown Source) > at > kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown Source) > at > kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown Source) > at kafka.utils.Pool.getAndMaybePut(Unknown Source) > at > kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStats(Unknown > Source) > at kafka.producer.SyncProducer.<init>(Unknown Source) > at kafka.producer.ProducerPool$.createSyncProducer(Unknown Source) > at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source) > at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source) > at > org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:40) > at > org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:208) > at > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaSystemAdmin.scala:149) > at > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaSystemAdmin.scala:149) > at > org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:54) > at > org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:146) > at > org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:125) > at > org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(Util.scala:98) > at > org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(Util.scala:86) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86) > at > org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79) > at > org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78) > at > org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) > Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 30 more > > -----Original Message----- > From: Chris Riccomini [mailto:[email protected]] > Sent: Tuesday, March 18, 2014 8:06 PM > To: [email protected] > Subject: Re: Writing my Custom Job > > Hey Sonali, > > Line 65 currently looks like: > > val amClient = new AMRMClientImpl[ContainerRequest] > > There is no call to Option().get, which is what would trigger this > exception. Are you running the latest Samza code from the master branch? > > Cheers, > Chris > > On 3/18/14 5:11 PM, "[email protected]" > <[email protected]> wrote: > > >Has this happened to anyone before? > >Exception in thread "main" java.util.NoSuchElementException: None.get > > at scala.None$.get(Option.scala:313) > > at scala.None$.get(Option.scala:311) > > at > >org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:65) > > at > org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) > > > >-----Original Message----- > >From: Garry Turkington [mailto:[email protected]] > >Sent: Tuesday, March 18, 2014 4:16 PM > >To: [email protected] > >Subject: RE: Writing my Custom Job > > > >Hi, > > > >1. Specifically, since I'm using kafka I don't have to write Consumer > >and Systemfactory Classes? Correct? > >Correct > > > >2. For the SamzaStreamTask would the input be a String? i.e. > > String event = (String)envelope.getMessage(); > > > >Correct -- just make sure you have the serdes on the system/stream > >configured as string. > > > >Garry > > > >-----Original Message----- > >From: [email protected] > >[mailto:[email protected]] > >Sent: 18 March 2014 22:57 > >To: [email protected] > >Subject: RE: Writing my Custom Job > > > >Hey Chris, > > > >Thanks for the quick response! > > > >So the thing is, we have the kafka producer independent of Samza. The > >idea is to test the kafka streams with different CEPs. So one example > >would be Storm. That's why I have a separate kafka job running that > >reads from a file and writes to a kafka topic. > > > >So assuming there is a topic say "input-topic" (where the message is > >some event of type "String" and key is the actual eventId of the event) > >already in place, I want to write a SamzaStreamTask that will read this > >string, parse it and write to another kafka topic. In other words, Job1 > >is already done independent of Samza. I'm working on Job2 using Samza. > > > >1. Specifically, since I'm using kafka I don't have to write Consumer > >and Systemfactory Classes? Correct? > >2. For the SamzaStreamTask would the input be a String? i.e. > > String event = (String)envelope.getMessage(); > > > >Thanks! > >Sonali > > > >-----Original Message----- > >From: Chris Riccomini [mailto:[email protected]] > >Sent: Tuesday, March 18, 2014 3:16 PM > >To: [email protected] > >Subject: Re: Writing my Custom Job > > > >Hey Sonali, > > > >1. For CSV file reading, you should check this JIRA out: > > > > https://issues.apache.org/jira/browse/SAMZA-138 > > > >2. You don't need to write to a Kafka topic using the standard Kafka > >producer. You can use the collector that comes as part of the process > >method. Take a look at one of the hello-samza examples to see how this > >is done. (collector.send(...)) > > > >3. To parse the string, retrieve specific fields, etc, you should write > >a second StreamTask that reads from the first. The flow should look like: > > > ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2 > > > >Where "Job 1" sends messages to "Kafka topic 1" partitioned by event > >ID, and "Job 2" parses and retrieves specific fields, and produces to > >"Kafka topic 2". > > > >Cheers, > >Chris > > > >On 3/18/14 2:48 PM, "[email protected]" > ><[email protected]> wrote: > > > >>Hey Guys, > >> > >>So I'm writing my custom job in Samza and wanted to make sure I'm not > >>re-inventing the wheel. > >> > >>I have a kafka job running that reads from a csv file and writes to a > >>topic. I wrote this using the kafka producer api independent of Samza. > >>The output is a KeyedMessage with key being my eventId and the value > >>is a string corresponding to my event. > >> > >>Now, I want to write a SamzaConsumer that listens on my topic, parses > >>the string to retrieve specific fields I'm interested in and write it > >>out to a different kafka topic. > >> > >>Are there existing classes I can leverage to do this? > >> > >>Thanks, > >>Sonali > >> > >>Sonali Parthasarathy > >>R&D Developer, Data Insights > >>Accenture Technology Labs > >>703-341-7432 > >> > >> > >>________________________________ > >> > >>This message is for the designated recipient only and may contain > >>privileged, proprietary, or otherwise confidential information. If you > >>have received it in error, please notify the sender immediately and > >>delete the original. Any other use of the e-mail by you is prohibited. > >>Where allowed by local law, electronic communications with Accenture > >>and its affiliates, including e-mail and instant messaging (including > >>content), may be scanned by our systems for the purposes of > >>information security and assessment of internal compliance with > Accenture policy. > >>______________________________________________________________________ > >>_ > >>___ > >>____________ > >> > >>www.accenture.com > > > > > > > >________________________________ > > > >This message is for the designated recipient only and may contain > >privileged, proprietary, or otherwise confidential information. If you > >have received it in error, please notify the sender immediately and > >delete the original. Any other use of the e-mail by you is prohibited. > >Where allowed by local law, electronic communications with Accenture > >and its affiliates, including e-mail and instant messaging (including > >content), may be scanned by our systems for the purposes of information > >security and assessment of internal compliance with Accenture policy. > >_______________________________________________________________________ > >___ > >____________ > > > >www.accenture.com > > > > > >----- > >No virus found in this message. > >Checked by AVG - www.avg.com > >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date: > >03/18/14 > > > > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Where allowed > by local law, electronic communications with Accenture and its affiliates, > including e-mail and instant messaging (including content), may be scanned > by our systems for the purposes of information security and assessment of > internal compliance with Accenture policy. > > ______________________________________________________________________________________ > > www.accenture.com > >
