Hi Chris,

Thanks for the answer! The serde configuration is much more flexible than I
thought. Will try them in current project.

Cheers,

Fang, Yan
[email protected]
+1 (206) 849-4108


On Thu, Mar 20, 2014 at 9:30 AM, Chris Riccomini <[email protected]>wrote:

> Hey Yan,
>
> Yes to all questions. :)
>
> 1) will systems.kafka.samza.msg.serde=string apply for both input kafka
> stream and output kafka stream? (I assume yes...)
>
> Yes. Defining a serde for a system will apply to all of that system's
> streams for both their input and output. You can override specific streams
> (again, for both input and output) using stream-level overrides
> (systems.<system name>.streams.<stream name>.samza.msg.serde).
>
>
> 2) in stream-level overwrite (systems.kafka.streams.<your output
> stream>.samza.msg.serde=json), is the output-stream the output topic name?
>
> This is system-implementation specific. From Samza's point of view, the
> stream is just a string that we give to the system. It's up to the system
> to decide whether the string is a URI, a file path, a topic name, etc. In
> Kafka's case, yes, the <your output stream> is just the name of the topic
> in Kafka.
>
> 3) Is it possible to specify different serdes for different Kafka topics?
>
> Yes. One option is to do stream-level overrides (systems.<system
> name>.streams.<stream name>.samza.msg.serde) on a per-stream basis.
> Sometimes you might have many streams that all have the same serde, and
> it's annoying to define them all. In such a case, we usually just define
> two systems. For example, you might define two kafka systems, one called
> "kafka-input" and one called "kafka-output). These systems might be
> configured identically, except that one will have a "json" serde, and the
> other will have a "string" serde (or whatever. You then define your
> task.inputs to take kafka-input.your-topic, and send messages with
> OutgoingMessageEnvelope to the "kafka-output" system.
>
> Cheers,
> Chris
>
> On 3/19/14 4:17 PM, "Yan Fang" <[email protected]> wrote:
>
> >Hi Chris,
> >
> >Since Sonaly brings this up, there are related questions which have been
> >confusing me for a while:
> >1) will systems.kafka.samza.msg.serde=string apply for both input kafka
> >stream and output kafka stream? (I assume yes...)
> >2) in stream-level overwrite (systems.kafka.streams.<your output
> >stream>.samza.msg.serde=json), is the output-stream the output topic name?
> >3) Is it possible to specify different serdes for different Kafka topics?
> >
> >Thanks,
> >
> >Fang, Yan
> >[email protected]
> >+1 (206) 849-4108
> >
> >
> >On Wed, Mar 19, 2014 at 4:03 PM, <[email protected]>
> >wrote:
> >
> >> Ah, makes sense. So I might have to define separate serdes for the input
> >> and output topic. Thanks!
> >>
> >>
> >> -----Original Message-----
> >> From: Chris Riccomini [mailto:[email protected]]
> >> Sent: Wednesday, March 19, 2014 3:57 PM
> >> To: [email protected]
> >> Subject: Re: Writing my Custom Job
> >>
> >> Hey Sonali,
> >>
> >> It's the name of the stream you're sending to in
> >>OutgoingMessageEnvelope.
> >>
> >>   collector.send(new OutgoingMessageEnvelope("kafka",
> >>"your-output-topic",
> >> key, msg))
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 3/19/14 3:54 PM, "[email protected]"
> >> <[email protected]> wrote:
> >>
> >> >Ah I see.  This might be a silly question, but where can I find the
> >> >output_stream name?
> >> >
> >> >-----Original Message-----
> >> >From: Chris Riccomini [mailto:[email protected]]
> >> >Sent: Wednesday, March 19, 2014 3:29 PM
> >> >To: [email protected]
> >> >Subject: Re: Writing my Custom Job
> >> >
> >> >Hey Sonali,
> >> >
> >> >In such a case, you probably just want to do stream-level overrides.
> >> >First, add the JSON serde to your config:
> >> >
> >> >
> >> >serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeF
> >> >act
> >> >o
> >> >ry
> >> >
> >> >
> >> >And then forcibly specifying the serde for your output topic(s):
> >> >
> >> >  systems.kafka.streams.<your output stream>.samza.msg.serde=json
> >> >
> >> >Cheers,
> >> >Chris
> >> >
> >> >On 3/19/14 3:21 PM, "[email protected]"
> >> ><[email protected]> wrote:
> >> >
> >> >>One quick question,
> >> >>
> >> >>systems.kafka.samza.msg.serde=string, is this property for the output
> >> >>aka the new kafka topic I'm trying to produce on?
> >> >>
> >> >>What happens, if the input message is a String and the output is a
> >>JSON?
> >> >>
> >> >>Thanks,
> >> >>Sonali
> >> >>
> >> >>-----Original Message-----
> >> >>From: Yan Fang [mailto:[email protected]]
> >> >>Sent: Wednesday, March 19, 2014 2:31 PM
> >> >>To: [email protected]
> >> >>Subject: Re: Writing my Custom Job
> >> >>
> >> >>make sure systems.kafka.samza.msg.serde=string
> >> >>
> >> >>
> >> >>Fang, Yan
> >> >>[email protected]
> >> >>+1 (206) 849-4108
> >> >>
> >> >>
> >> >>On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]>
> >> >>wrote:
> >> >>
> >> >>> Hmm, so I read the input which is a String and want to write to the
> >> >>> output topic also as a string. So I used the StringSerdeFactory. But
> >> >>> my samza complains and wants the JsonSerdefactory.
> >> >>> Exception in thread "main" org.apache.samza.SamzaException: Serde
> >> >>> json for system kafka does not exist in configuration.
> >> >>>         at
> >> >>>
> >> >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$
> >> >>>1
> >> >>>7$$
> >> >>>anonfun$16.apply(SamzaContainer.scala:185)
> >> >>>         at
> >> >>>
> >> >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$
> >> >>>1
> >> >>>7$$
> >> >>>anonfun$16.apply(SamzaContainer.scala:185)
> >> >>>         at
> >>scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> >> >>>         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> >> >>>         at
> >> >>>
> >> >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$
> >> >>>1
> >> >>>7.a
> >> >>>pply(SamzaContainer.scala:185)
> >> >>>         at
> >> >>>
> >> >>>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$
> >> >>>1
> >> >>>7.a
> >> >>>pply(SamzaContainer.scala:183)
> >> >>>         at
> >> >>>
> >>
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> >> >>>sca
> >> >>>la:244)
> >> >>>         at
> >> >>>
> >>
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> >> >>>sca
> >> >>>la:244)
> >> >>>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> >> >>>         at
> >> >>>
> >>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >> >>>         at
> >> >>>
> >> >>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.
> >> >>>s
> >> >>>cal
> >> >>>a:47)
> >> >>>         at scala.collection.SetLike$class.map(SetLike.scala:93)
> >> >>>         at scala.collection.AbstractSet.map(Set.scala:47)
> >> >>>         at
> >> >>>
> >> >>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaCont
> >> >>>a
> >> >>>ine
> >> >>>r.scala:183)
> >> >>>         at
> >> >>>
> >> >>>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaCont
> >> >>>a
> >> >>>ine
> >> >>>r.scala:180)
> >> >>>         at
> >> >>>
> >>
> >>>>>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:
> >> >>>207
> >> >>>)
> >> >>>         at
> >> >>>
> >>
> >>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:8
> >>>>>2)
> >> >>>         at
> >> >>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >> >>>
> >> >>> Why is that?
> >> >>>
> >> >>> -----Original Message-----
> >> >>> From: Chris Riccomini [mailto:[email protected]]
> >> >>> Sent: Wednesday, March 19, 2014 12:40 PM
> >> >>> To: [email protected]
> >> >>> Subject: Re: Writing my Custom Job
> >> >>>
> >> >>> Hey Guys,
> >> >>>
> >> >>> This is a transitive dependency from Kafka. My guess is that the
> >> >>>Kafka  pom files published to maven are not quite working properly.
> >> >>>We've  already had some issues with them. Yan's suggestion is an
> >> >>>appropriate  fix. The only thing I'd add is that you can make them a
> >> >>>runtime dependency.
> >> >>>
> >> >>>       <scope>runtime</scope>
> >> >>>
> >> >>>
> >> >>> Cheers,
> >> >>> Chris
> >> >>>
> >> >>> On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote:
> >> >>>
> >> >>> >Hi Sonali,
> >> >>> >
> >> >>> >I am not sure how Chris will response. My quick fix for this
> >> >>> >problem was simple adding the
> >> >>> >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metri
> >> >>> >c
> >> >>> >s
> >> >>> >-c
> >> >>> >ore
> >> >>> ></artifactId><version>2.2.0</version></dependency>*
> >> >>> >to the pom.xml file when compile...
> >> >>> >
> >> >>> >Hope that will help.
> >> >>> >
> >> >>> >Thanks,
> >> >>> >
> >> >>> >Fang, Yan
> >> >>> >[email protected]
> >> >>> >+1 (206) 849-4108
> >> >>> >
> >> >>> >
> >> >>> >On Wed, Mar 19, 2014 at 11:23 AM,
> >> >>> ><[email protected]>wrote:
> >> >>> >
> >> >>> >> So I downloaded and built the latest samza code. I get this
> >>error:
> >> >>> >> Exception in thread "main" java.lang.NoClassDefFoundError:
> >> >>> >> com/yammer/metrics/Metrics
> >> >>> >>         at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown
> >> >>> >>Source)
> >> >>> >>         at kafka.producer.ProducerRequestMetrics.newTimer(Unknown
> >> >>> >>Source)
> >> >>> >>         at kafka.producer.ProducerRequestMetrics.<init>(Unknown
> >> >>>Source)
> >> >>> >>         at kafka.producer.ProducerRequestStats.<init>(Unknown
> >> >>>Source)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unkno
> >> >>> >>w
> >> >>> >>n
> >> >>> >>Source)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unkno
> >> >>> >>w
> >> >>> >>n
> >> >>> >>Source)
> >> >>> >>         at kafka.utils.Pool.getAndMaybePut(Unknown Source)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestSta
> >> >>> >>t
> >> >>> >>s
> >> >>> >>(U
> >> >>> >>nkn
> >> >>> >>own
> >> >>> >> Source)
> >> >>> >>         at kafka.producer.SyncProducer.<init>(Unknown Source)
> >> >>> >>         at
> >> >>> >>kafka.producer.ProducerPool$.createSyncProducer(Unknown
> >> >>> >>Source)
> >> >>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
> >> >>>Source)
> >> >>> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown
> >> >>>Source)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Cl
> >> >>> >>i
> >> >>> >>e
> >> >>> >>nt
> >> >>> >>Uti
> >> >>> >>lTopicMetadataStore.scala:40)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(Ka
> >> >>> >>f
> >> >>> >>k
> >> >>> >>aS
> >> >>> >>yst
> >> >>> >>emAdmin.scala:208)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Ka
> >> >>> >>f
> >> >>> >>k
> >> >>> >>aS
> >> >>> >>yst
> >> >>> >>emAdmin.scala:149)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Ka
> >> >>> >>f
> >> >>> >>k
> >> >>> >>aS
> >> >>> >>yst
> >> >>> >>emAdmin.scala:149)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata
> >> >>> >>(
> >> >>> >>T
> >> >>> >>op
> >> >>> >>icM
> >> >>> >>etadataCache.scala:54)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMeta
> >> >>> >>d
> >> >>> >>a
> >> >>> >>ta
> >> >>> >>(Ka
> >> >>> >>fkaSystemAdmin.scala:146)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMeta
> >> >>> >>d
> >> >>> >>a
> >> >>> >>ta
> >> >>> >>(Ka
> >> >>> >>fkaSystemAdmin.scala:125)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.app
> >> >>> >>l
> >> >>> >>y
> >> >>> >>(U
> >> >>> >>til
> >> >>> >>.scala:98)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.app
> >> >>> >>l
> >> >>> >>y
> >> >>> >>(U
> >> >>> >>til
> >> >>> >>.scala:86)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversa
> >> >>> >>b
> >> >>> >>l
> >> >>> >>eL
> >> >>> >>ike
> >> >>> >>.scala:251)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversa
> >> >>> >>b
> >> >>> >>l
> >> >>> >>eL
> >> >>> >>ike
> >> >>> >>.scala:251)
> >> >>> >>         at
> >>scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> >> >>> >>         at
> >> >>> >>
> >> >>>
> >> >>>>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scal
> >> >>>>>a
> >> >>>>>:25
> >> >>>>>1)
> >> >>> >>         at
> >> >>> >>
> >>scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >> >>> >>         at
> >> >>> >>
> >>org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86)
> >> >>> >>         at
> >> >>> >>
> >> >>> >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAp
> >> >>> >>p
> >> >>> >>M
> >> >>> >>as
> >> >>> >>ter
> >> >>> >>TaskManager.scala:79)
> >> >>> >>         at
> >> >>> >>
> >> >>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:7
> >> >>>8
> >> >>>)
> >> >>> >>         at
> >> >>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scal
> >> >>> >> a
> >> >>> >> ) Caused by: java.lang.ClassNotFoundException:
> >> >>>com.yammer.metrics.Metrics
> >> >>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> >> >>> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> >> >>> >>         at java.security.AccessController.doPrivileged(Native
> >> >>>Method)
> >> >>> >>         at
> >> >>>java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> >> >>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> >> >>> >>         at
> >> >>>sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> >> >>> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> >> >>> >>         ... 30 more
> >> >>> >>
> >> >>> >> -----Original Message-----
> >> >>> >> From: Chris Riccomini [mailto:[email protected]]
> >> >>> >> Sent: Tuesday, March 18, 2014 8:06 PM
> >> >>> >> To: [email protected]
> >> >>> >> Subject: Re: Writing my Custom Job
> >> >>> >>
> >> >>> >> Hey Sonali,
> >> >>> >>
> >> >>> >> Line 65 currently looks like:
> >> >>> >>
> >> >>> >>   val amClient = new AMRMClientImpl[ContainerRequest]
> >> >>> >>
> >> >>> >> There is no call to Option().get, which is what would trigger
> >> >>> >> this exception. Are you running the latest Samza code from the
> >> >>> >> master
> >> >>>branch?
> >> >>> >>
> >> >>> >> Cheers,
> >> >>> >> Chris
> >> >>> >>
> >> >>> >> On 3/18/14 5:11 PM, "[email protected]"
> >> >>> >> <[email protected]> wrote:
> >> >>> >>
> >> >>> >> >Has this happened to anyone before?
> >> >>> >> >Exception in thread "main" java.util.NoSuchElementException:
> >> >>>None.get
> >> >>> >> >       at scala.None$.get(Option.scala:313)
> >> >>> >> >       at scala.None$.get(Option.scala:311)
> >> >>> >> >       at
> >> >>> >>
> >> >>>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:
> >> >>>>6
> >> >>>>5)
> >> >>> >> >       at
> >> >>> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scal
> >> >>> >> a
> >> >>> >> )
> >> >>> >> >
> >> >>> >> >-----Original Message-----
> >> >>> >> >From: Garry Turkington [mailto:[email protected]]
> >> >>> >> >Sent: Tuesday, March 18, 2014 4:16 PM
> >> >>> >> >To: [email protected]
> >> >>> >> >Subject: RE: Writing my Custom Job
> >> >>> >> >
> >> >>> >> >Hi,
> >> >>> >> >
> >> >>> >> >1. Specifically, since I'm using kafka I don't have to write
> >> >>> >> >Consumer and Systemfactory Classes? Correct?
> >> >>> >> >Correct
> >> >>> >> >
> >> >>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
> >> >>> >> >    String event = (String)envelope.getMessage();
> >> >>> >> >
> >> >>> >> >Correct -- just make sure you have the serdes on the
> >> >>> >> >system/stream configured as string.
> >> >>> >> >
> >> >>> >> >Garry
> >> >>> >> >
> >> >>> >> >-----Original Message-----
> >> >>> >> >From: [email protected]
> >> >>> >> >[mailto:[email protected]]
> >> >>> >> >Sent: 18 March 2014 22:57
> >> >>> >> >To: [email protected]
> >> >>> >> >Subject: RE: Writing my Custom Job
> >> >>> >> >
> >> >>> >> >Hey Chris,
> >> >>> >> >
> >> >>> >> >Thanks for the quick response!
> >> >>> >> >
> >> >>> >> >So the thing is, we have the kafka producer independent of
> >>Samza.
> >> >>> >> >The idea is to test the kafka streams with different CEPs. So
> >> >>> >> >one example would be Storm. That's why I have a separate kafka
> >> >>> >> >job running that reads from a file and writes to a kafka topic.
> >> >>> >> >
> >> >>> >> >So assuming there is a topic say "input-topic" (where the
> >> >>> >> >message is some event of type "String" and key is the actual
> >> >>> >> >eventId of the
> >> >>> >> >event) already in place, I want to write a SamzaStreamTask that
> >> >>> >> >will read this string, parse it and write to another kafka
> >>topic.
> >> >>> >> >In other words, Job1 is already done independent of Samza. I'm
> >> >>> >> >working on
> >> >>> Job2 using Samza.
> >> >>> >> >
> >> >>> >> >1. Specifically, since I'm using kafka I don't have to write
> >> >>> >> >Consumer and Systemfactory Classes? Correct?
> >> >>> >> >2. For the SamzaStreamTask would the input be a String? i.e.
> >> >>> >> >    String event = (String)envelope.getMessage();
> >> >>> >> >
> >> >>> >> >Thanks!
> >> >>> >> >Sonali
> >> >>> >> >
> >> >>> >> >-----Original Message-----
> >> >>> >> >From: Chris Riccomini [mailto:[email protected]]
> >> >>> >> >Sent: Tuesday, March 18, 2014 3:16 PM
> >> >>> >> >To: [email protected]
> >> >>> >> >Subject: Re: Writing my Custom Job
> >> >>> >> >
> >> >>> >> >Hey Sonali,
> >> >>> >> >
> >> >>> >> >1. For CSV file reading, you should check this JIRA out:
> >> >>> >> >
> >> >>> >> >  https://issues.apache.org/jira/browse/SAMZA-138
> >> >>> >> >
> >> >>> >> >2. You don't need to write to a Kafka topic using the standard
> >> >>> >> >Kafka producer. You can use the collector that comes as part of
> >> >>> >> >the process method. Take a look at one of the hello-samza
> >> >>> >> >examples to see how this is done. (collector.send(...))
> >> >>> >> >
> >> >>> >> >3. To parse the string, retrieve specific fields, etc, you
> >> >>> >> >should write a second StreamTask that reads from the first. The
> >> >>> >> >flow should look
> >> >>> >>like:
> >> >>> >> >
> >> >>> >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2
> >> >>> >> >
> >> >>> >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by
> >> >>> >> >event ID, and "Job 2" parses and retrieves specific fields, and
> >> >>> >> >produces to "Kafka topic 2".
> >> >>> >> >
> >> >>> >> >Cheers,
> >> >>> >> >Chris
> >> >>> >> >
> >> >>> >> >On 3/18/14 2:48 PM, "[email protected]"
> >> >>> >> ><[email protected]> wrote:
> >> >>> >> >
> >> >>> >> >>Hey Guys,
> >> >>> >> >>
> >> >>> >> >>So I'm writing my custom job in Samza and wanted to make sure
> >> >>> >> >>I'm not re-inventing the wheel.
> >> >>> >> >>
> >> >>> >> >>I have a kafka job running that reads from a csv file and
> >> >>> >> >>writes to a topic. I wrote this using the kafka producer api
> >> >>> >> >>independent of
> >> >>> Samza.
> >> >>> >> >>The output is a KeyedMessage with key being my eventId and the
> >> >>> >> >>value is a string corresponding to my event.
> >> >>> >> >>
> >> >>> >> >>Now, I want to write a SamzaConsumer that listens on my topic,
> >> >>> >> >>parses the string to retrieve specific fields I'm interested in
> >> >>> >> >>and write it out to a different kafka topic.
> >> >>> >> >>
> >> >>> >> >>Are there existing classes I can leverage to do this?
> >> >>> >> >>
> >> >>> >> >>Thanks,
> >> >>> >> >>Sonali
> >> >>> >> >>
> >> >>> >> >>Sonali Parthasarathy
> >> >>> >> >>R&D Developer, Data Insights
> >> >>> >> >>Accenture Technology Labs
> >> >>> >> >>703-341-7432
> >> >>> >> >>
> >> >>> >> >>
> >> >>> >> >>________________________________
> >> >>> >> >>
> >> >>> >> >>This message is for the designated recipient only and may
> >> >>> >> >>contain privileged, proprietary, or otherwise confidential
> >> >>>information.
> >> >>> >> >>If you have received it in error, please notify the sender
> >> >>> >> >>immediately and delete the original. Any other use of the
> >> >>> >> >>e-mail by you is
> >> >>> prohibited.
> >> >>> >> >>Where allowed by local law, electronic communications with
> >> >>> >> >>Accenture and its affiliates, including e-mail and instant
> >> >>> >> >>messaging (including content), may be scanned by our systems
> >> >>> >> >>for the purposes of information security and assessment of
> >> >>> >> >>internal compliance with
> >> >>> >> Accenture policy.
> >> >>> >> >>_______________________________________________________________
> >> >>> >> >>_
> >> >>> >> >>_
> >> >>> >> >>__
> >> >>> >> >>___
> >> >>> >> >>_
> >> >>> >> >>___
> >> >>> >> >>____________
> >> >>> >> >>
> >> >>> >> >>www.accenture.com
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >________________________________
> >> >>> >> >
> >> >>> >> >This message is for the designated recipient only and may
> >> >>> >> >contain privileged, proprietary, or otherwise confidential
> >> information.
> >> >>> >> >If you have received it in error, please notify the sender
> >> >>> >> >immediately and delete the original. Any other use of the e-mail
> >> >>> >> >by you is
> >> >>> prohibited.
> >> >>> >> >Where allowed by local law, electronic communications with
> >> >>> >> >Accenture and its affiliates, including e-mail and instant
> >> >>> >> >messaging (including content), may be scanned by our systems for
> >> >>> >> >the purposes of information security and assessment of internal
> >> >>> >> >compliance with
> >> >>> Accenture policy.
> >> >>> >> >________________________________________________________________
> >> >>> >> >_
> >> >>> >> >_
> >> >>> >> >__
> >> >>> >> >___
> >> >>> >> >___
> >> >>> >> >____________
> >> >>> >> >
> >> >>> >> >www.accenture.com
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >-----
> >> >>> >> >No virus found in this message.
> >> >>> >> >Checked by AVG - www.avg.com
> >> >>> >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date:
> >> >>> >> >03/18/14
> >> >>> >> >
> >> >>> >> >
> >> >>> >>
> >> >>> >>
> >> >>> >>
> >> >>> >> ________________________________
> >> >>> >>
> >> >>> >> This message is for the designated recipient only and may contain
> >> >>> >>privileged, proprietary, or otherwise confidential information. If
> >> >>> >>you have  received it in error, please notify the sender
> >> >>> >>immediately and delete the  original. Any other use of the e-mail
> >> >>> >>by you is prohibited. Where allowed  by local law, electronic
> >> >>> >>communications with Accenture and its affiliates,  including
> >> >>> >>e-mail and instant messaging (including content), may be scanned
> >> >>> >>by our systems for the purposes of information security and
> >> >>> >>assessment of internal compliance with Accenture policy.
> >> >>> >>
> >> >>> >>
> >> >>> >>__________________________________________________________________
> >> >>> >>_
> >> >>> >>_
> >> >>> >>__
> >> >>> >>___
> >> >>> >>_____________
> >> >>> >>
> >> >>> >> www.accenture.com
> >> >>> >>
> >> >>> >>
> >> >>>
> >> >>>
> >> >>>
> >> >>> ________________________________
> >> >>>
> >> >>> This message is for the designated recipient only and may contain
> >> >>>privileged, proprietary, or otherwise confidential information. If
> >> >>>you have received it in error, please notify the sender immediately
> >> >>>and delete the original. Any other use of the e-mail by you is
> >> prohibited.
> >> >>> Where allowed by local law, electronic communications with Accenture
> >> >>>and its affiliates, including e-mail and instant messaging (including
> >> >>>content), may be scanned by our systems for the purposes of
> >> >>>information security and assessment of internal compliance with
> >> >>>Accenture policy.
> >> >>>
> >> >>> ____________________________________________________________________
> >> >>> _
> >> >>> _
> >> >>> ________________
> >> >>>
> >> >>> www.accenture.com
> >> >>>
> >> >>>
> >> >>
> >> >>________________________________
> >> >>
> >> >>This message is for the designated recipient only and may contain
> >> >>privileged, proprietary, or otherwise confidential information. If you
> >> >>have received it in error, please notify the sender immediately and
> >> >>delete the original. Any other use of the e-mail by you is prohibited.
> >> >>Where allowed by local law, electronic communications with Accenture
> >> >>and its affiliates, including e-mail and instant messaging (including
> >> >>content), may be scanned by our systems for the purposes of
> >> >>information security and assessment of internal compliance with
> >> Accenture policy.
> >> >>______________________________________________________________________
> >> >>_
> >> >>___
> >> >>____________
> >> >>
> >> >>www.accenture.com
> >> >
> >> >
> >> >
> >> >________________________________
> >> >
> >> >This message is for the designated recipient only and may contain
> >> >privileged, proprietary, or otherwise confidential information. If you
> >> >have received it in error, please notify the sender immediately and
> >> >delete the original. Any other use of the e-mail by you is prohibited.
> >> >Where allowed by local law, electronic communications with Accenture
> >> >and its affiliates, including e-mail and instant messaging (including
> >> >content), may be scanned by our systems for the purposes of information
> >> >security and assessment of internal compliance with Accenture policy.
> >> >_______________________________________________________________________
> >> >___
> >> >____________
> >> >
> >> >www.accenture.com
> >> >
> >>
> >>
> >>
> >> ________________________________
> >>
> >> This message is for the designated recipient only and may contain
> >> privileged, proprietary, or otherwise confidential information. If you
> >>have
> >> received it in error, please notify the sender immediately and delete
> >>the
> >> original. Any other use of the e-mail by you is prohibited. Where
> >>allowed
> >> by local law, electronic communications with Accenture and its
> >>affiliates,
> >> including e-mail and instant messaging (including content), may be
> >>scanned
> >> by our systems for the purposes of information security and assessment
> >>of
> >> internal compliance with Accenture policy.
> >>
> >>
> >>_________________________________________________________________________
> >>_____________
> >>
> >> www.accenture.com
> >>
> >>
>
>

Reply via email to