Hey Sonali,

It's the name of the stream you're sending to in OutgoingMessageEnvelope.

  collector.send(new OutgoingMessageEnvelope("kafka", "your-output-topic",
key, msg))

Cheers,
Chris

On 3/19/14 3:54 PM, "[email protected]"
<[email protected]> wrote:

>Ah I see.  This might be a silly question, but where can I find the
>output_stream name?
>
>-----Original Message-----
>From: Chris Riccomini [mailto:[email protected]]
>Sent: Wednesday, March 19, 2014 3:29 PM
>To: [email protected]
>Subject: Re: Writing my Custom Job
>
>Hey Sonali,
>
>In such a case, you probably just want to do stream-level overrides.
>First, add the JSON serde to your config:
>
>
>serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFact
>o
>ry
>
>
>And then forcibly specifying the serde for your output topic(s):
>
>  systems.kafka.streams.<your output stream>.samza.msg.serde=json
>
>Cheers,
>Chris
>
>On 3/19/14 3:21 PM, "[email protected]"
><[email protected]> wrote:
>
>>One quick question,
>>
>>systems.kafka.samza.msg.serde=string, is this property for the output
>>aka the new kafka topic I'm trying to produce on?
>>
>>What happens, if the input message is a String and the output is a JSON?
>>
>>Thanks,
>>Sonali
>>
>>-----Original Message-----
>>From: Yan Fang [mailto:[email protected]]
>>Sent: Wednesday, March 19, 2014 2:31 PM
>>To: [email protected]
>>Subject: Re: Writing my Custom Job
>>
>>make sure systems.kafka.samza.msg.serde=string
>>
>>
>>Fang, Yan
>>[email protected]
>>+1 (206) 849-4108
>>
>>
>>On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]>
>>wrote:
>>
>>> Hmm, so I read the input which is a String and want to write to the
>>> output topic also as a string. So I used the StringSerdeFactory. But
>>> my samza complains and wants the JsonSerdefactory.
>>> Exception in thread "main" org.apache.samza.SamzaException: Serde
>>> json for system kafka does not exist in configuration.
>>>         at
>>>
>>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>>7$$
>>>anonfun$16.apply(SamzaContainer.scala:185)
>>>         at
>>>
>>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>>7$$
>>>anonfun$16.apply(SamzaContainer.scala:185)
>>>         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>>         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>>>         at
>>>
>>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>>7.a
>>>pply(SamzaContainer.scala:185)
>>>         at
>>>
>>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$1
>>>7.a
>>>pply(SamzaContainer.scala:183)
>>>         at
>>>
>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>>>sca
>>>la:244)
>>>         at
>>>
>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>>>sca
>>>la:244)
>>>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>>>         at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>         at
>>>
>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
>>>cal
>>>a:47)
>>>         at scala.collection.SetLike$class.map(SetLike.scala:93)
>>>         at scala.collection.AbstractSet.map(Set.scala:47)
>>>         at
>>>
>>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaConta
>>>ine
>>>r.scala:183)
>>>         at
>>>
>>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaConta
>>>ine
>>>r.scala:180)
>>>         at
>>>
>>>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:
>>>207
>>>)
>>>         at
>>> 
>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
>>>         at
>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>
>>> Why is that?
>>>
>>> -----Original Message-----
>>> From: Chris Riccomini [mailto:[email protected]]
>>> Sent: Wednesday, March 19, 2014 12:40 PM
>>> To: [email protected]
>>> Subject: Re: Writing my Custom Job
>>>
>>> Hey Guys,
>>>
>>> This is a transitive dependency from Kafka. My guess is that the
>>>Kafka  pom files published to maven are not quite working properly.
>>>We've  already had some issues with them. Yan's suggestion is an
>>>appropriate  fix. The only thing I'd add is that you can make them a
>>>runtime dependency.
>>>
>>>       <scope>runtime</scope>
>>>
>>>
>>> Cheers,
>>> Chris
>>>
>>> On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote:
>>>
>>> >Hi Sonali,
>>> >
>>> >I am not sure how Chris will response. My quick fix for this problem
>>> >was simple adding the
>>> >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metric
>>> >s
>>> >-c
>>> >ore
>>> ></artifactId><version>2.2.0</version></dependency>*
>>> >to the pom.xml file when compile...
>>> >
>>> >Hope that will help.
>>> >
>>> >Thanks,
>>> >
>>> >Fang, Yan
>>> >[email protected]
>>> >+1 (206) 849-4108
>>> >
>>> >
>>> >On Wed, Mar 19, 2014 at 11:23 AM,
>>> ><[email protected]>wrote:
>>> >
>>> >> So I downloaded and built the latest samza code. I get this error:
>>> >> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> >> com/yammer/metrics/Metrics
>>> >>         at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown
>>> >>Source)
>>> >>         at kafka.producer.ProducerRequestMetrics.newTimer(Unknown
>>> >>Source)
>>> >>         at kafka.producer.ProducerRequestMetrics.<init>(Unknown
>>>Source)
>>> >>         at kafka.producer.ProducerRequestStats.<init>(Unknown
>>>Source)
>>> >>         at
>>> >>
>>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknow
>>> >>n
>>> >>Source)
>>> >>         at
>>> >>
>>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknow
>>> >>n
>>> >>Source)
>>> >>         at kafka.utils.Pool.getAndMaybePut(Unknown Source)
>>> >>         at
>>> >>
>>> >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStat
>>> >>s
>>> >>(U
>>> >>nkn
>>> >>own
>>> >> Source)
>>> >>         at kafka.producer.SyncProducer.<init>(Unknown Source)
>>> >>         at kafka.producer.ProducerPool$.createSyncProducer(Unknown
>>> >>Source)
>>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
>>>Source)
>>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
>>>Source)
>>> >>         at
>>> >>
>>> >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Cli
>>> >>e
>>> >>nt
>>> >>Uti
>>> >>lTopicMetadataStore.scala:40)
>>> >>         at
>>> >>
>>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(Kaf
>>> >>k
>>> >>aS
>>> >>yst
>>> >>emAdmin.scala:208)
>>> >>         at
>>> >>
>>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kaf
>>> >>k
>>> >>aS
>>> >>yst
>>> >>emAdmin.scala:149)
>>> >>         at
>>> >>
>>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kaf
>>> >>k
>>> >>aS
>>> >>yst
>>> >>emAdmin.scala:149)
>>> >>         at
>>> >>
>>> >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(
>>> >>T
>>> >>op
>>> >>icM
>>> >>etadataCache.scala:54)
>>> >>         at
>>> >>
>>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetad
>>> >>a
>>> >>ta
>>> >>(Ka
>>> >>fkaSystemAdmin.scala:146)
>>> >>         at
>>> >>
>>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetad
>>> >>a
>>> >>ta
>>> >>(Ka
>>> >>fkaSystemAdmin.scala:125)
>>> >>         at
>>> >>
>>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.appl
>>> >>y
>>> >>(U
>>> >>til
>>> >>.scala:98)
>>> >>         at
>>> >>
>>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.appl
>>> >>y
>>> >>(U
>>> >>til
>>> >>.scala:86)
>>> >>         at
>>> >>
>>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversab
>>> >>l
>>> >>eL
>>> >>ike
>>> >>.scala:251)
>>> >>         at
>>> >>
>>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversab
>>> >>l
>>> >>eL
>>> >>ike
>>> >>.scala:251)
>>> >>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>>> >>         at
>>> >>
>>>
>>>>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala
>>>>>:25
>>>>>1)
>>> >>         at
>>> >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>> >>         at
>>> >> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86)
>>> >>         at
>>> >>
>>> >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaApp
>>> >>M
>>> >>as
>>> >>ter
>>> >>TaskManager.scala:79)
>>> >>         at
>>> >>
>>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78
>>>)
>>> >>         at
>>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala
>>> >> ) Caused by: java.lang.ClassNotFoundException:
>>>com.yammer.metrics.Metrics
>>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>> >>         at java.security.AccessController.doPrivileged(Native
>>>Method)
>>> >>         at 
>>>java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>> >>         at
>>>sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>> >>         ... 30 more
>>> >>
>>> >> -----Original Message-----
>>> >> From: Chris Riccomini [mailto:[email protected]]
>>> >> Sent: Tuesday, March 18, 2014 8:06 PM
>>> >> To: [email protected]
>>> >> Subject: Re: Writing my Custom Job
>>> >>
>>> >> Hey Sonali,
>>> >>
>>> >> Line 65 currently looks like:
>>> >>
>>> >>   val amClient = new AMRMClientImpl[ContainerRequest]
>>> >>
>>> >> There is no call to Option().get, which is what would trigger this
>>> >> exception. Are you running the latest Samza code from the master
>>>branch?
>>> >>
>>> >> Cheers,
>>> >> Chris
>>> >>
>>> >> On 3/18/14 5:11 PM, "[email protected]"
>>> >> <[email protected]> wrote:
>>> >>
>>> >> >Has this happened to anyone before?
>>> >> >Exception in thread "main" java.util.NoSuchElementException:
>>>None.get
>>> >> >       at scala.None$.get(Option.scala:313)
>>> >> >       at scala.None$.get(Option.scala:311)
>>> >> >       at
>>> >>
>>>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:6
>>>>5)
>>> >> >       at
>>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala
>>> >> )
>>> >> >
>>> >> >-----Original Message-----
>>> >> >From: Garry Turkington [mailto:[email protected]]
>>> >> >Sent: Tuesday, March 18, 2014 4:16 PM
>>> >> >To: [email protected]
>>> >> >Subject: RE: Writing my Custom Job
>>> >> >
>>> >> >Hi,
>>> >> >
>>> >> >1. Specifically, since I'm using kafka I don't have to write
>>> >> >Consumer and Systemfactory Classes? Correct?
>>> >> >Correct
>>> >> >
>>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
>>> >> >    String event = (String)envelope.getMessage();
>>> >> >
>>> >> >Correct -- just make sure you have the serdes on the
>>> >> >system/stream configured as string.
>>> >> >
>>> >> >Garry
>>> >> >
>>> >> >-----Original Message-----
>>> >> >From: [email protected]
>>> >> >[mailto:[email protected]]
>>> >> >Sent: 18 March 2014 22:57
>>> >> >To: [email protected]
>>> >> >Subject: RE: Writing my Custom Job
>>> >> >
>>> >> >Hey Chris,
>>> >> >
>>> >> >Thanks for the quick response!
>>> >> >
>>> >> >So the thing is, we have the kafka producer independent of Samza.
>>> >> >The idea is to test the kafka streams with different CEPs. So one
>>> >> >example would be Storm. That's why I have a separate kafka job
>>> >> >running that reads from a file and writes to a kafka topic.
>>> >> >
>>> >> >So assuming there is a topic say "input-topic" (where the message
>>> >> >is some event of type "String" and key is the actual eventId of
>>> >> >the
>>> >> >event) already in place, I want to write a SamzaStreamTask that
>>> >> >will read this string, parse it and write to another kafka topic.
>>> >> >In other words, Job1 is already done independent of Samza. I'm
>>> >> >working on
>>> Job2 using Samza.
>>> >> >
>>> >> >1. Specifically, since I'm using kafka I don't have to write
>>> >> >Consumer and Systemfactory Classes? Correct?
>>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
>>> >> >    String event = (String)envelope.getMessage();
>>> >> >
>>> >> >Thanks!
>>> >> >Sonali
>>> >> >
>>> >> >-----Original Message-----
>>> >> >From: Chris Riccomini [mailto:[email protected]]
>>> >> >Sent: Tuesday, March 18, 2014 3:16 PM
>>> >> >To: [email protected]
>>> >> >Subject: Re: Writing my Custom Job
>>> >> >
>>> >> >Hey Sonali,
>>> >> >
>>> >> >1. For CSV file reading, you should check this JIRA out:
>>> >> >
>>> >> >  https://issues.apache.org/jira/browse/SAMZA-138
>>> >> >
>>> >> >2. You don't need to write to a Kafka topic using the standard
>>> >> >Kafka producer. You can use the collector that comes as part of
>>> >> >the process method. Take a look at one of the hello-samza
>>> >> >examples to see how this is done. (collector.send(...))
>>> >> >
>>> >> >3. To parse the string, retrieve specific fields, etc, you should
>>> >> >write a second StreamTask that reads from the first. The flow
>>> >> >should look
>>> >>like:
>>> >> >
>>> >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2
>>> >> >
>>> >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by
>>> >> >event ID, and "Job 2" parses and retrieves specific fields, and
>>> >> >produces to "Kafka topic 2".
>>> >> >
>>> >> >Cheers,
>>> >> >Chris
>>> >> >
>>> >> >On 3/18/14 2:48 PM, "[email protected]"
>>> >> ><[email protected]> wrote:
>>> >> >
>>> >> >>Hey Guys,
>>> >> >>
>>> >> >>So I'm writing my custom job in Samza and wanted to make sure
>>> >> >>I'm not re-inventing the wheel.
>>> >> >>
>>> >> >>I have a kafka job running that reads from a csv file and writes
>>> >> >>to a topic. I wrote this using the kafka producer api
>>> >> >>independent of
>>> Samza.
>>> >> >>The output is a KeyedMessage with key being my eventId and the
>>> >> >>value is a string corresponding to my event.
>>> >> >>
>>> >> >>Now, I want to write a SamzaConsumer that listens on my topic,
>>> >> >>parses the string to retrieve specific fields I'm interested in
>>> >> >>and write it out to a different kafka topic.
>>> >> >>
>>> >> >>Are there existing classes I can leverage to do this?
>>> >> >>
>>> >> >>Thanks,
>>> >> >>Sonali
>>> >> >>
>>> >> >>Sonali Parthasarathy
>>> >> >>R&D Developer, Data Insights
>>> >> >>Accenture Technology Labs
>>> >> >>703-341-7432
>>> >> >>
>>> >> >>
>>> >> >>________________________________
>>> >> >>
>>> >> >>This message is for the designated recipient only and may
>>> >> >>contain privileged, proprietary, or otherwise confidential
>>>information.
>>> >> >>If you have received it in error, please notify the sender
>>> >> >>immediately and delete the original. Any other use of the e-mail
>>> >> >>by you is
>>> prohibited.
>>> >> >>Where allowed by local law, electronic communications with
>>> >> >>Accenture and its affiliates, including e-mail and instant
>>> >> >>messaging (including content), may be scanned by our systems for
>>> >> >>the purposes of information security and assessment of internal
>>> >> >>compliance with
>>> >> Accenture policy.
>>> >> >>________________________________________________________________
>>> >> >>_
>>> >> >>__
>>> >> >>___
>>> >> >>_
>>> >> >>___
>>> >> >>____________
>>> >> >>
>>> >> >>www.accenture.com
>>> >> >
>>> >> >
>>> >> >
>>> >> >________________________________
>>> >> >
>>> >> >This message is for the designated recipient only and may contain
>>> >> >privileged, proprietary, or otherwise confidential information.
>>> >> >If you have received it in error, please notify the sender
>>> >> >immediately and delete the original. Any other use of the e-mail
>>> >> >by you is
>>> prohibited.
>>> >> >Where allowed by local law, electronic communications with
>>> >> >Accenture and its affiliates, including e-mail and instant
>>> >> >messaging (including content), may be scanned by our systems for
>>> >> >the purposes of information security and assessment of internal
>>> >> >compliance with
>>> Accenture policy.
>>> >> >_________________________________________________________________
>>> >> >_
>>> >> >__
>>> >> >___
>>> >> >___
>>> >> >____________
>>> >> >
>>> >> >www.accenture.com
>>> >> >
>>> >> >
>>> >> >-----
>>> >> >No virus found in this message.
>>> >> >Checked by AVG - www.avg.com
>>> >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date:
>>> >> >03/18/14
>>> >> >
>>> >> >
>>> >>
>>> >>
>>> >>
>>> >> ________________________________
>>> >>
>>> >> This message is for the designated recipient only and may contain
>>> >>privileged, proprietary, or otherwise confidential information. If
>>> >>you have  received it in error, please notify the sender
>>> >>immediately and delete the  original. Any other use of the e-mail
>>> >>by you is prohibited. Where allowed  by local law, electronic
>>> >>communications with Accenture and its affiliates,  including e-mail
>>> >>and instant messaging (including content), may be scanned  by our
>>> >>systems for the purposes of information security and assessment of
>>> >>internal compliance with Accenture policy.
>>> >>
>>> >>
>>> >>___________________________________________________________________
>>> >>_
>>> >>__
>>> >>___
>>> >>_____________
>>> >>
>>> >> www.accenture.com
>>> >>
>>> >>
>>>
>>>
>>>
>>> ________________________________
>>>
>>> This message is for the designated recipient only and may contain
>>>privileged, proprietary, or otherwise confidential information. If you
>>>have received it in error, please notify the sender immediately and
>>>delete the original. Any other use of the e-mail by you is prohibited.
>>> Where allowed by local law, electronic communications with Accenture
>>>and its affiliates, including e-mail and instant messaging (including
>>>content), may be scanned by our systems for the purposes of
>>>information security and assessment of internal compliance with
>>>Accenture policy.
>>>
>>> _____________________________________________________________________
>>> _
>>> ________________
>>>
>>> www.accenture.com
>>>
>>>
>>
>>________________________________
>>
>>This message is for the designated recipient only and may contain
>>privileged, proprietary, or otherwise confidential information. If you
>>have received it in error, please notify the sender immediately and
>>delete the original. Any other use of the e-mail by you is prohibited.
>>Where allowed by local law, electronic communications with Accenture
>>and its affiliates, including e-mail and instant messaging (including
>>content), may be scanned by our systems for the purposes of information
>>security and assessment of internal compliance with Accenture policy.
>>_______________________________________________________________________
>>___
>>____________
>>
>>www.accenture.com
>
>
>
>________________________________
>
>This message is for the designated recipient only and may contain
>privileged, proprietary, or otherwise confidential information. If you
>have received it in error, please notify the sender immediately and
>delete the original. Any other use of the e-mail by you is prohibited.
>Where allowed by local law, electronic communications with Accenture and
>its affiliates, including e-mail and instant messaging (including
>content), may be scanned by our systems for the purposes of information
>security and assessment of internal compliance with Accenture policy.
>__________________________________________________________________________
>____________
>
>www.accenture.com
>

Reply via email to