Still showing the same message

On Thu, Aug 7, 2014 at 3:56 PM, Chris Riccomini <
[email protected]> wrote:

> Hey Telles,
>
> The code you've posted in Produce.java shows:
>
>             KeyedMessage<String, String> data = new KeyedMessage<String,
> String>("consumptions", String.valueOf(key),String.valueOf(value));
>
>
> Which suggests that you are sending a string for both key and value. If
> you have a Samza task consuming from this topic, you should set:
>
> systems.system-name.samza.key.serde=string
>
> systems.system-name.samza.msg.serde=string
>
>
> Cheers,
> Chris
>
> On 8/7/14 11:52 AM, "Telles Nobrega" <[email protected]> wrote:
>
> >Hum, that sounds like a perfect reason for it.
> >
> >I'm writing to the topic with this code
> >
> >
> https://github.com/tellesnobrega/kafka-producer/blob/master/src/Produce.ja
> >va
> >
> >My problem is that I need to send numbers as key and value to the kafka
> >topic so i can read it in samza.
> >
> >What is the best way to de/serialize this?
> >
> >
> >On Thu, Aug 7, 2014 at 3:43 PM, Yan Fang <[email protected]> wrote:
> >
> >> Hi Telles,
> >>
> >> One of the possible reasons is that, in your process method, you are
> >>trying
> >> to send a HashMap, not a String, in the collection.send. Could you
> >>check it
> >> ?
> >>
> >> Thanks,
> >>
> >> Fang, Yan
> >> [email protected]
> >> +1 (206) 849-4108
> >>
> >>
> >> On Thu, Aug 7, 2014 at 11:25 AM, Telles Nobrega
> >><[email protected]>
> >> wrote:
> >>
> >> > I changed my properties a little to look like this: link
> >> > <
> >> >
> >>
> >>
> http://mail-archives.apache.org/mod_mbox/samza-dev/201311.mbox/%3CEA1B8C3
> >>[email protected]%3E
> >> > >
> >> >
> >> > here it goes:
> >> >
> >> > # Job
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=consumptions
> >> >
> >> > # YARN
> >> >
> >> >
> >>
> >>yarn.package.path=file://${basedir}/target/${project.artifactId}-${
> pom.ve
> >>rsion}-dist.tar.gz
> >> >
> >> > # Task
> >> > task.class=alarm.ConsumptionProducer
> >> > task.inputs=kafka.consumptions
> >> >
> >> >
> >>
> >>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint
> >>ManagerFactory
> >> > task.checkpoint.system=kafka
> >> > # Normally, this would be 3, but we have only one broker.
> >> > task.checkpoint.replication.factor=1
> >> >
> >> > # Metrics
> >> > metrics.reporters=snapshot,jmx
> >> >
> >> >
> >>
> >>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metrics
> >>SnapshotReporterFactory
> >> > metrics.reporter.snapshot.stream=kafka.metrics
> >> >
> >> >
> >>
> >>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterF
> >>actory
> >> >
> >> > # Serializers
> >> >
> >> >
> >>
> >>serializers.registry.string.class=org.apache.samza.serializers.StringSerd
> >>eFactory
> >> >
> >> >
> >>
> >>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSn
> >>apshotSerdeFactory
> >> >
> >> > # Kafka System
> >> >
> >> >
> >>
> >>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFact
> >>ory
> >> > systems.kafka.samza.msg.serde=string
> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >> > systems.kafka.consumer.auto.offset.reset=largest
> >> > systems.kafka.producer.metadata.broker.list=localhost:9092
> >> > systems.kafka.producer.producer.type=sync
> >> > # Normally, we'd set this much higher, but we want things to look
> >>snappy
> >> in
> >> > the demo.
> >> > systems.kafka.producer.batch.num.messages=1
> >> > ystems.kafka.streams.metrics.samza.msg.serde=metrics
> >> >
> >> > But I'm getting this output,
> >> >
> >> > 0Caught exception in process loop.
> >> > java.lang.ClassCastException: java.util.HashMap cannot be cast to
> >> > java.lang.String
> >> >  at
> >> org.apache.samza.serializers.StringSerde.toBytes(StringSerde.scala:33)
> >> > at
> >> org.apache.samza.serializers.SerdeManager.toBytes(SerdeManager.scala:69)
> >> >  at
> >> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65)
> >> > at
> >> >
> >> >
> >>
> >>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstanc
> >>e.scala:170)
> >> >  at
> >> >
> >> >
> >>
> >>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstanc
> >>e.scala:170)
> >> > at
> >> >
> >> >
> >>
> >>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
> >>a:59)
> >> >  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> > at
> >>org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170)
> >> >  at
> >> >
> >>
> >>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:11
> >>6)
> >> > at
> >> >
> >>
> >>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:11
> >>6)
> >> >  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> >  at
> >> >
> >>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> >> > at org.apache.samza.container.RunLoop.send(RunLoop.scala:116)
> >> >  at org.apache.samza.container.RunLoop.run(RunLoop.scala:59)
> >> > at
> >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
> >> >  at
> >> >
> >>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81)
> >> > at
> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >> >
> >> >
> >> >
> >> > On Thu, Aug 7, 2014 at 3:12 PM, Telles Nobrega
> >><[email protected]>
> >> > wrote:
> >> >
> >> > > Hi Chris, I really appreciate the time you are taking to help me
> >>out.
> >> > >
> >> > > This is job.properties file
> >> > >
> >> > > # Job
> >> > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > > job.name=consumptions
> >> > >
> >> > > # YARN
> >> > >
> >> > >
> >> >
> >>
> >>yarn.package.path=file://${basedir}/target/${project.artifactId}-${
> pom.ve
> >>rsion}-dist.tar.gz
> >> > >
> >> > > # Task
> >> > > task.class=alarm.ConsumptionProducer
> >> > > task.inputs=kafka.consumptions
> >> > >
> >> > >
> >> >
> >>
> >>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint
> >>ManagerFactory
> >> > > task.checkpoint.system=kafka
> >> > > # Normally, this would be 3, but we have only one broker.
> >> > > task.checkpoint.replication.factor=1
> >> > >
> >> > > # Serializers
> >> > >
> >> > >
> >> >
> >>
> >>serializers.registry.serde.class=org.apache.samza.serializers.StringSerde
> >>Factory
> >> > >
> >> > > # Kafka System
> >> > >
> >> > >
> >> >
> >>
> >>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFact
> >>ory
> >> > > *systems.kafka.samza.msg.serde=json*
> >> > > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >> > > systems.kafka.consumer.auto.offset.reset=largest
> >> > > systems.kafka.producer.metadata.broker.list=localhost:9092
> >> > > systems.kafka.producer.producer.type=sync
> >> > > # Normally, we'd set this much higher, but we want things to look
> >> snappy
> >> > > in the demo.
> >> > > systems.kafka.producer.batch.num.messages=1
> >> > >
> >> > > *systems.kafka.streams.consumptions.key.serde=string*
> >> > > *systems.kafka.streams.consumptions.msg.serde=string*
> >> > >
> >> > > Does this look right?
> >> > > I'm running a local cluster, I want to have it running nicely
> >>before I
> >> > can
> >> > > distribute it.
> >> > >
> >> > >
> >> > >
> >> > > On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini <
> >> > > [email protected]> wrote:
> >> > >
> >> > >> Hey Telles,
> >> > >>
> >> > >> Sure. In your job.properties file, define the serde:
> >> > >>
> >> > >> # Serializers
> >> > >>
> >> > >>
> >> >
> >>
> >>serializers.registry.serde.class=org.apache.samza.serializers.StringSerde
> >>Fa
> >> > >> ctory
> >> > >>
> >> > >>
> >> > >> Then define the serde for your system:
> >> > >>
> >> > >> systems.kafka.samza.msg.serde=string
> >> > >>
> >> > >>
> >> > >> Cheers,
> >> > >> Chris
> >> > >>
> >> > >> On 8/7/14 10:54 AM, "Telles Nobrega" <[email protected]>
> >>wrote:
> >> > >>
> >> > >> >Can you give and example on how to use string serde, i'm getting
> >>an
> >> > error
> >> > >> >when trying to set to string
> >> > >> >
> >> > >> >:53:26:804Got system producers: Set(kafka)
> >> > >> >17:53:26:809Got serdes: Set(string)
> >> > >> >17:53:29:206Container container_1407433587782_0001_01_000017
> >>failed
> >> > with
> >> > >> >exit code 1 - Exception from container-launch:
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega <
> >> > [email protected]>
> >> > >> >wrote:
> >> > >> >
> >> > >> >> Thanks.
> >> > >> >>
> >> > >> >>
> >> > >> >> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini <
> >> > >> >> [email protected]> wrote:
> >> > >> >>
> >> > >> >>> Hey Telles,
> >> > >> >>>
> >> > >> >>> This is definitely a serde error. It sounds like your message
> >>is
> >> not
> >> > >> >>> properly formatted as a JSON blob.
> >> > >> >>>
> >> > >> >>> If you are trying to just use a string as the message (vs. a
> >>well
> >> > >> >>> formatted JSON blob), then you should use the StringSerde.
> >> > >> >>>
> >> > >> >>> Cheers,
> >> > >> >>> Chris
> >> > >> >>>
> >> > >> >>> On 8/7/14 8:05 AM, "Telles Nobrega" <[email protected]>
> >> > wrote:
> >> > >> >>>
> >> > >> >>> >Hi, I'm running a simple samza topology that reads from  a
> >>kafka
> >> > >> topic
> >> > >> >>> >that
> >> > >> >>> >only has two Strings
> >> > >> >>> >xx:xx:xx:xxxx;xx
> >> > >> >>> >And its throwing an error
> >> > >> >>> >
> >> > >> >>> >Caught exception in process loop.
> >> > >> >>> >org.codehaus.jackson.JsonParseException: Unexpected character
> >> ('F'
> >> > >> >>>(code
> >> > >> >>> >70)): expected a valid value (number, String, array, object,
> >> > 'true',
> >> > >> >>> >'false' or 'null')
> >> > >> >>> > at [Source: [B@56dfb465; line: 1, column: 2]
> >> > >> >>> >at
> >> > >>
> >> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonPars
> >>>>>>er
> >> > >> >>>>Min
> >> > >> >>> >imalBase.java:385)
> >> > >> >>> >at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar
> >>>>>>(J
> >> > >> >>>>son
> >> > >> >>> >ParserMinimalBase.java:306)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf
> >>>>>>8S
> >> > >> >>>>tre
> >> > >> >>> >amParser.java:1581)
> >> > >> >>> >at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8
> >>>>>>St
> >> > >> >>>>rea
> >> > >> >>> >mParser.java:436)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser
> >>>>>>.j
> >> > >> >>>>ava
> >> > >> >>> >:322)
> >> > >> >>> >at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.ja
> >>>>>>va
> >> > >> >>>>:24
> >> > >> >>> >32)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.j
> >>>>>>av
> >> > >> >>>>a:2
> >> > >> >>> >389)
> >> > >> >>> >at
> >> > >> >>>
> >> > >>
> >> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
> >> > >> >>> > at
> >> > >>
> >> >>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
> >> > >> >>> >at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scal
> >>>>>>a:
> >> > >> >>>>115
> >> > >> >>> >)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$sys
> >>>>>>te
> >> > >> >>>>m$S
> >> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$sys
> >>>>>>te
> >> > >> >>>>m$S
> >> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242)
> >> > >> >>> >at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> > >> >>> > at
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> > >> >>> >at
> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> > >> >>> > at
> >>scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> > >> >>> >at org.apache.samza.system.SystemConsumers.org
> >> > >> >>>
> >> > >$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply
> >>>>>>(S
> >> > >> >>>>yst
> >> > >> >>> >emConsumers.scala:180)
> >> > >> >>> >at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply
> >>>>>>(S
> >> > >> >>>>yst
> >> > >> >>> >emConsumers.scala:180)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
> >>>>>>.s
> >> > >> >>>>cal
> >> > >> >>> >a:244)
> >> > >> >>> >at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
> >>>>>>.s
> >> > >> >>>>cal
> >> > >> >>> >a:244)
> >> > >> >>> > at
> >>scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> > >> >>> >at
> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> > >> >>> > at
> >> > scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
> >> > >> >>> >at
> >> > >>
> >> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >> > >> >>> > at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.
> >>>>>>sc
> >> > >> >>>>ala
> >> > >> >>> >:47)
> >> > >> >>> >at scala.collection.SetLike$class.map(SetLike.scala:93)
> >> > >> >>> > at scala.collection.AbstractSet.map(Set.scala:47)
> >> > >> >>> >at
> >> > >> >>>
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumers.
> >>>>>>sc
> >> > >> >>>>ala
> >> > >> >>> >:180)
> >> > >> >>> > at
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.scala
> >>>>>>:4
> >> > >> >>>>4)
> >> > >> >>> >at
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:
> >>>>>>20
> >> > >> >>>>8)
> >> > >> >>> > at
> >>org.apache.samza.container.RunLoop.process(RunLoop.scala:73)
> >> > >> >>> >at org.apache.samza.container.RunLoop.run(RunLoop.scala:57)
> >> > >> >>> > at
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:50
> >>>>>>4)
> >> > >> >>> >at
> >> > >> >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:
> >>>>>>81
> >> > >> >>>>)
> >> > >> >>> > at
> >> > >>
> >> >>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >> > >> >>> >
> >> > >> >>> >
> >> > >> >>> >
> >> > >> >>> >​Has anyone experienced this error before?​
> >> > >> >>> >
> >> > >> >>> >--
> >> > >> >>> >------------------------------------------
> >> > >> >>> >Telles Mota Vidal Nobrega
> >> > >> >>> >M.sc. Candidate at UFCG
> >> > >> >>> >B.sc. in Computer Science at UFCG
> >> > >> >>> >Software Engineer at OpenStack Project - HP/LSD-UFCG
> >> > >> >>>
> >> > >> >>>
> >> > >> >>
> >> > >> >>
> >> > >> >> --
> >> > >> >> ------------------------------------------
> >> > >> >> Telles Mota Vidal Nobrega
> >> > >> >> M.sc. Candidate at UFCG
> >> > >> >> B.sc. in Computer Science at UFCG
> >> > >> >> Software Engineer at OpenStack Project - HP/LSD-UFCG
> >> > >> >>
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >--
> >> > >> >------------------------------------------
> >> > >> >Telles Mota Vidal Nobrega
> >> > >> >M.sc. Candidate at UFCG
> >> > >> >B.sc. in Computer Science at UFCG
> >> > >> >Software Engineer at OpenStack Project - HP/LSD-UFCG
> >> > >>
> >> > >>
> >> > >
> >> > >
> >> > > --
> >> > > ------------------------------------------
> >> > > Telles Mota Vidal Nobrega
> >> > > M.sc. Candidate at UFCG
> >> > > B.sc. in Computer Science at UFCG
> >> > > Software Engineer at OpenStack Project - HP/LSD-UFCG
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > ------------------------------------------
> >> > Telles Mota Vidal Nobrega
> >> > M.sc. Candidate at UFCG
> >> > B.sc. in Computer Science at UFCG
> >> > Software Engineer at OpenStack Project - HP/LSD-UFCG
> >> >
> >>
> >
> >
> >
> >--
> >------------------------------------------
> >Telles Mota Vidal Nobrega
> >M.sc. Candidate at UFCG
> >B.sc. in Computer Science at UFCG
> >Software Engineer at OpenStack Project - HP/LSD-UFCG
>
>


-- 
------------------------------------------
Telles Mota Vidal Nobrega
M.sc. Candidate at UFCG
B.sc. in Computer Science at UFCG
Software Engineer at OpenStack Project - HP/LSD-UFCG

Reply via email to