Hey Telles, Can you paste the code for your StreamTask? If it's still showing the same message, then it sounds like you're still trying to send a HashMap using a StringSerde.
Cheers, Chris On 8/7/14 12:05 PM, "Telles Nobrega" <[email protected]> wrote: >Still showing the same message > > >On Thu, Aug 7, 2014 at 3:56 PM, Chris Riccomini < >[email protected]> wrote: > >> Hey Telles, >> >> The code you've posted in Produce.java shows: >> >> KeyedMessage<String, String> data = new KeyedMessage<String, >> String>("consumptions", String.valueOf(key),String.valueOf(value)); >> >> >> Which suggests that you are sending a string for both key and value. If >> you have a Samza task consuming from this topic, you should set: >> >> systems.system-name.samza.key.serde=string >> >> systems.system-name.samza.msg.serde=string >> >> >> Cheers, >> Chris >> >> On 8/7/14 11:52 AM, "Telles Nobrega" <[email protected]> wrote: >> >> >Hum, that sounds like a perfect reason for it. >> > >> >I'm writing to the topic with this code >> > >> > >> >>https://github.com/tellesnobrega/kafka-producer/blob/master/src/Produce.j >>a >> >va >> > >> >My problem is that I need to send numbers as key and value to the kafka >> >topic so i can read it in samza. >> > >> >What is the best way to de/serialize this? >> > >> > >> >On Thu, Aug 7, 2014 at 3:43 PM, Yan Fang <[email protected]> wrote: >> > >> >> Hi Telles, >> >> >> >> One of the possible reasons is that, in your process method, you are >> >>trying >> >> to send a HashMap, not a String, in the collection.send. Could you >> >>check it >> >> ? >> >> >> >> Thanks, >> >> >> >> Fang, Yan >> >> [email protected] >> >> +1 (206) 849-4108 >> >> >> >> >> >> On Thu, Aug 7, 2014 at 11:25 AM, Telles Nobrega >> >><[email protected]> >> >> wrote: >> >> >> >> > I changed my properties a little to look like this: link >> >> > < >> >> > >> >> >> >> >> >>http://mail-archives.apache.org/mod_mbox/samza-dev/201311.mbox/%3CEA1B8C3 >> >>[email protected]%3E >> >> > > >> >> > >> >> > here it goes: >> >> > >> >> > # Job >> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> >> > job.name=consumptions >> >> > >> >> > # YARN >> >> > >> >> > >> >> >> >>yarn.package.path=file://${basedir}/target/${project.artifactId}-${ >> pom.ve >> >>rsion}-dist.tar.gz >> >> > >> >> > # Task >> >> > task.class=alarm.ConsumptionProducer >> >> > task.inputs=kafka.consumptions >> >> > >> >> > >> >> >> >>>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoi >>>>nt >> >>ManagerFactory >> >> > task.checkpoint.system=kafka >> >> > # Normally, this would be 3, but we have only one broker. >> >> > task.checkpoint.replication.factor=1 >> >> > >> >> > # Metrics >> >> > metrics.reporters=snapshot,jmx >> >> > >> >> > >> >> >> >>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metri >>>>cs >> >>SnapshotReporterFactory >> >> > metrics.reporter.snapshot.stream=kafka.metrics >> >> > >> >> > >> >> >> >>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporte >>>>rF >> >>actory >> >> > >> >> > # Serializers >> >> > >> >> > >> >> >> >>>>serializers.registry.string.class=org.apache.samza.serializers.StringSe >>>>rd >> >>eFactory >> >> > >> >> > >> >> >> >>>>serializers.registry.metrics.class=org.apache.samza.serializers.Metrics >>>>Sn >> >>apshotSerdeFactory >> >> > >> >> > # Kafka System >> >> > >> >> > >> >> >> >>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFa >>>>ct >> >>ory >> >> > systems.kafka.samza.msg.serde=string >> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ >> >> > systems.kafka.consumer.auto.offset.reset=largest >> >> > systems.kafka.producer.metadata.broker.list=localhost:9092 >> >> > systems.kafka.producer.producer.type=sync >> >> > # Normally, we'd set this much higher, but we want things to look >> >>snappy >> >> in >> >> > the demo. >> >> > systems.kafka.producer.batch.num.messages=1 >> >> > ystems.kafka.streams.metrics.samza.msg.serde=metrics >> >> > >> >> > But I'm getting this output, >> >> > >> >> > 0Caught exception in process loop. >> >> > java.lang.ClassCastException: java.util.HashMap cannot be cast to >> >> > java.lang.String >> >> > at >> >> >>org.apache.samza.serializers.StringSerde.toBytes(StringSerde.scala:33) >> >> > at >> >> >>org.apache.samza.serializers.SerdeManager.toBytes(SerdeManager.scala:69) >> >> > at >> >> >>org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65) >> >> > at >> >> > >> >> > >> >> >> >>>>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInsta >>>>nc >> >>e.scala:170) >> >> > at >> >> > >> >> > >> >> >> >>>>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInsta >>>>nc >> >>e.scala:170) >> >> > at >> >> > >> >> > >> >> >> >>>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sc >>>>al >> >>a:59) >> >> > at >>scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> > at >> >>org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170) >> >> > at >> >> > >> >> >> >>>>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala: >>>>11 >> >>6) >> >> > at >> >> > >> >> >> >>>>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala: >>>>11 >> >>6) >> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> > at >> >> > >> >>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:20 >>>>6) >> >> > at org.apache.samza.container.RunLoop.send(RunLoop.scala:116) >> >> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:59) >> >> > at >> >> >>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504) >> >> > at >> >> > >> >>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81 >>>>) >> >> > at >> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >> >> > >> >> > >> >> > >> >> > On Thu, Aug 7, 2014 at 3:12 PM, Telles Nobrega >> >><[email protected]> >> >> > wrote: >> >> > >> >> > > Hi Chris, I really appreciate the time you are taking to help me >> >>out. >> >> > > >> >> > > This is job.properties file >> >> > > >> >> > > # Job >> >> > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> >> > > job.name=consumptions >> >> > > >> >> > > # YARN >> >> > > >> >> > > >> >> > >> >> >> >>yarn.package.path=file://${basedir}/target/${project.artifactId}-${ >> pom.ve >> >>rsion}-dist.tar.gz >> >> > > >> >> > > # Task >> >> > > task.class=alarm.ConsumptionProducer >> >> > > task.inputs=kafka.consumptions >> >> > > >> >> > > >> >> > >> >> >> >>>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoi >>>>nt >> >>ManagerFactory >> >> > > task.checkpoint.system=kafka >> >> > > # Normally, this would be 3, but we have only one broker. >> >> > > task.checkpoint.replication.factor=1 >> >> > > >> >> > > # Serializers >> >> > > >> >> > > >> >> > >> >> >> >>>>serializers.registry.serde.class=org.apache.samza.serializers.StringSer >>>>de >> >>Factory >> >> > > >> >> > > # Kafka System >> >> > > >> >> > > >> >> > >> >> >> >>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFa >>>>ct >> >>ory >> >> > > *systems.kafka.samza.msg.serde=json* >> >> > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ >> >> > > systems.kafka.consumer.auto.offset.reset=largest >> >> > > systems.kafka.producer.metadata.broker.list=localhost:9092 >> >> > > systems.kafka.producer.producer.type=sync >> >> > > # Normally, we'd set this much higher, but we want things to look >> >> snappy >> >> > > in the demo. >> >> > > systems.kafka.producer.batch.num.messages=1 >> >> > > >> >> > > *systems.kafka.streams.consumptions.key.serde=string* >> >> > > *systems.kafka.streams.consumptions.msg.serde=string* >> >> > > >> >> > > Does this look right? >> >> > > I'm running a local cluster, I want to have it running nicely >> >>before I >> >> > can >> >> > > distribute it. >> >> > > >> >> > > >> >> > > >> >> > > On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini < >> >> > > [email protected]> wrote: >> >> > > >> >> > >> Hey Telles, >> >> > >> >> >> > >> Sure. In your job.properties file, define the serde: >> >> > >> >> >> > >> # Serializers >> >> > >> >> >> > >> >> >> > >> >> >> >>>>serializers.registry.serde.class=org.apache.samza.serializers.StringSer >>>>de >> >>Fa >> >> > >> ctory >> >> > >> >> >> > >> >> >> > >> Then define the serde for your system: >> >> > >> >> >> > >> systems.kafka.samza.msg.serde=string >> >> > >> >> >> > >> >> >> > >> Cheers, >> >> > >> Chris >> >> > >> >> >> > >> On 8/7/14 10:54 AM, "Telles Nobrega" <[email protected]> >> >>wrote: >> >> > >> >> >> > >> >Can you give and example on how to use string serde, i'm >>getting >> >>an >> >> > error >> >> > >> >when trying to set to string >> >> > >> > >> >> > >> >:53:26:804Got system producers: Set(kafka) >> >> > >> >17:53:26:809Got serdes: Set(string) >> >> > >> >17:53:29:206Container container_1407433587782_0001_01_000017 >> >>failed >> >> > with >> >> > >> >exit code 1 - Exception from container-launch: >> >> > >> > >> >> > >> > >> >> > >> > >> >> > >> >On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega < >> >> > [email protected]> >> >> > >> >wrote: >> >> > >> > >> >> > >> >> Thanks. >> >> > >> >> >> >> > >> >> >> >> > >> >> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini < >> >> > >> >> [email protected]> wrote: >> >> > >> >> >> >> > >> >>> Hey Telles, >> >> > >> >>> >> >> > >> >>> This is definitely a serde error. It sounds like your >>message >> >>is >> >> not >> >> > >> >>> properly formatted as a JSON blob. >> >> > >> >>> >> >> > >> >>> If you are trying to just use a string as the message (vs. a >> >>well >> >> > >> >>> formatted JSON blob), then you should use the StringSerde. >> >> > >> >>> >> >> > >> >>> Cheers, >> >> > >> >>> Chris >> >> > >> >>> >> >> > >> >>> On 8/7/14 8:05 AM, "Telles Nobrega" >><[email protected]> >> >> > wrote: >> >> > >> >>> >> >> > >> >>> >Hi, I'm running a simple samza topology that reads from a >> >>kafka >> >> > >> topic >> >> > >> >>> >that >> >> > >> >>> >only has two Strings >> >> > >> >>> >xx:xx:xx:xxxx;xx >> >> > >> >>> >And its throwing an error >> >> > >> >>> > >> >> > >> >>> >Caught exception in process loop. >> >> > >> >>> >org.codehaus.jackson.JsonParseException: Unexpected >>character >> >> ('F' >> >> > >> >>>(code >> >> > >> >>> >70)): expected a valid value (number, String, array, >>object, >> >> > 'true', >> >> > >> >>> >'false' or 'null') >> >> > >> >>> > at [Source: [B@56dfb465; line: 1, column: 2] >> >> > >> >>> >at >> >> > >> >> >> >>>>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonPa >>>>>>>>rs >> >>>>>>er >> >> > >> >>>>Min >> >> > >> >>> >imalBase.java:385) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedCh >>>>>>>>ar >> >>>>>>(J >> >> > >> >>>>son >> >> > >> >>> >ParserMinimalBase.java:306) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(U >>>>>>>>tf >> >>>>>>8S >> >> > >> >>>>tre >> >> > >> >>> >amParser.java:1581) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Ut >>>>>>>>f8 >> >>>>>>St >> >> > >> >>>>rea >> >> > >> >>> >mParser.java:436) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamPars >>>>>>>>er >> >>>>>>.j >> >> > >> >>>>ava >> >> > >> >>> >:322) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper. >>>>>>>>ja >> >>>>>>va >> >> > >> >>>>:24 >> >> > >> >>> >32) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper >>>>>>>>.j >> >>>>>>av >> >> > >> >>>>a:2 >> >> > >> >>> >389) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >> >> >>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) >> >> > >> >>> > at >> >> > >> >> >> >>>>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.sc >>>>>>>>al >> >>>>>>a: >> >> > >> >>>>115 >> >> > >> >>> >) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$s >>>>>>>>ys >> >>>>>>te >> >> > >> >>>>m$S >> >> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$s >>>>>>>>ys >> >>>>>>te >> >> > >> >>>>m$S >> >> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242) >> >> > >> >>> >at >>scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> > >> >>> > at >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> > >> >>> >at >> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> > >> >>> > at >> >>scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> > >> >>> >at org.apache.samza.system.SystemConsumers.org >> >> > >> >>> >> >> > >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.app >>>>>>>>ly >> >>>>>>(S >> >> > >> >>>>yst >> >> > >> >>> >emConsumers.scala:180) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.app >>>>>>>>ly >> >>>>>>(S >> >> > >> >>>>yst >> >> > >> >>> >emConsumers.scala:180) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLi >>>>>>>>ke >> >>>>>>.s >> >> > >> >>>>cal >> >> > >> >>> >a:244) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLi >>>>>>>>ke >> >>>>>>.s >> >> > >> >>>>cal >> >> > >> >>> >a:244) >> >> > >> >>> > at >> >>scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> > >> >>> >at >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> > >> >>> > at >> >> > scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) >> >> > >> >>> >at >> >> > >> >> >> >>>>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Se >>>>>>>>t. >> >>>>>>sc >> >> > >> >>>>ala >> >> > >> >>> >:47) >> >> > >> >>> >at scala.collection.SetLike$class.map(SetLike.scala:93) >> >> > >> >>> > at scala.collection.AbstractSet.map(Set.scala:47) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumer >>>>>>>>s. >> >>>>>>sc >> >> > >> >>>>ala >> >> > >> >>> >:180) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.sca >>>>>>>>la >> >>>>>>:4 >> >> > >> >>>>4) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scal >>>>>>>>a: >> >>>>>>20 >> >> > >> >>>>8) >> >> > >> >>> > at >> >>org.apache.samza.container.RunLoop.process(RunLoop.scala:73) >> >> > >> >>> >at org.apache.samza.container.RunLoop.run(RunLoop.scala:57) >> >> > >> >>> > at >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala: >>>>>>>>50 >> >>>>>>4) >> >> > >> >>> >at >> >> > >> >>> >> >> > >> >> >> > >> >> >> > >> >> >> >>>>>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scal >>>>>>>>a: >> >>>>>>81 >> >> > >> >>>>) >> >> > >> >>> > at >> >> > >> >> >> >>>>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >> >> > >> >>> > >> >> > >> >>> > >> >> > >> >>> > >> >> > >> >>> >Has anyone experienced this error before? >> >> > >> >>> > >> >> > >> >>> >-- >> >> > >> >>> >------------------------------------------ >> >> > >> >>> >Telles Mota Vidal Nobrega >> >> > >> >>> >M.sc. Candidate at UFCG >> >> > >> >>> >B.sc. in Computer Science at UFCG >> >> > >> >>> >Software Engineer at OpenStack Project - HP/LSD-UFCG >> >> > >> >>> >> >> > >> >>> >> >> > >> >> >> >> > >> >> >> >> > >> >> -- >> >> > >> >> ------------------------------------------ >> >> > >> >> Telles Mota Vidal Nobrega >> >> > >> >> M.sc. Candidate at UFCG >> >> > >> >> B.sc. in Computer Science at UFCG >> >> > >> >> Software Engineer at OpenStack Project - HP/LSD-UFCG >> >> > >> >> >> >> > >> > >> >> > >> > >> >> > >> > >> >> > >> >-- >> >> > >> >------------------------------------------ >> >> > >> >Telles Mota Vidal Nobrega >> >> > >> >M.sc. Candidate at UFCG >> >> > >> >B.sc. in Computer Science at UFCG >> >> > >> >Software Engineer at OpenStack Project - HP/LSD-UFCG >> >> > >> >> >> > >> >> >> > > >> >> > > >> >> > > -- >> >> > > ------------------------------------------ >> >> > > Telles Mota Vidal Nobrega >> >> > > M.sc. Candidate at UFCG >> >> > > B.sc. in Computer Science at UFCG >> >> > > Software Engineer at OpenStack Project - HP/LSD-UFCG >> >> > > >> >> > >> >> > >> >> > >> >> > -- >> >> > ------------------------------------------ >> >> > Telles Mota Vidal Nobrega >> >> > M.sc. Candidate at UFCG >> >> > B.sc. in Computer Science at UFCG >> >> > Software Engineer at OpenStack Project - HP/LSD-UFCG >> >> > >> >> >> > >> > >> > >> >-- >> >------------------------------------------ >> >Telles Mota Vidal Nobrega >> >M.sc. Candidate at UFCG >> >B.sc. in Computer Science at UFCG >> >Software Engineer at OpenStack Project - HP/LSD-UFCG >> >> > > >-- >------------------------------------------ >Telles Mota Vidal Nobrega >M.sc. Candidate at UFCG >B.sc. in Computer Science at UFCG >Software Engineer at OpenStack Project - HP/LSD-UFCG
