Still showing the same message
On Thu, Aug 7, 2014 at 3:56 PM, Chris Riccomini < [email protected]> wrote: > Hey Telles, > > The code you've posted in Produce.java shows: > > KeyedMessage<String, String> data = new KeyedMessage<String, > String>("consumptions", String.valueOf(key),String.valueOf(value)); > > > Which suggests that you are sending a string for both key and value. If > you have a Samza task consuming from this topic, you should set: > > systems.system-name.samza.key.serde=string > > systems.system-name.samza.msg.serde=string > > > Cheers, > Chris > > On 8/7/14 11:52 AM, "Telles Nobrega" <[email protected]> wrote: > > >Hum, that sounds like a perfect reason for it. > > > >I'm writing to the topic with this code > > > > > https://github.com/tellesnobrega/kafka-producer/blob/master/src/Produce.ja > >va > > > >My problem is that I need to send numbers as key and value to the kafka > >topic so i can read it in samza. > > > >What is the best way to de/serialize this? > > > > > >On Thu, Aug 7, 2014 at 3:43 PM, Yan Fang <[email protected]> wrote: > > > >> Hi Telles, > >> > >> One of the possible reasons is that, in your process method, you are > >>trying > >> to send a HashMap, not a String, in the collection.send. Could you > >>check it > >> ? > >> > >> Thanks, > >> > >> Fang, Yan > >> [email protected] > >> +1 (206) 849-4108 > >> > >> > >> On Thu, Aug 7, 2014 at 11:25 AM, Telles Nobrega > >><[email protected]> > >> wrote: > >> > >> > I changed my properties a little to look like this: link > >> > < > >> > > >> > >> > http://mail-archives.apache.org/mod_mbox/samza-dev/201311.mbox/%3CEA1B8C3 > >>[email protected]%3E > >> > > > >> > > >> > here it goes: > >> > > >> > # Job > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >> > job.name=consumptions > >> > > >> > # YARN > >> > > >> > > >> > >>yarn.package.path=file://${basedir}/target/${project.artifactId}-${ > pom.ve > >>rsion}-dist.tar.gz > >> > > >> > # Task > >> > task.class=alarm.ConsumptionProducer > >> > task.inputs=kafka.consumptions > >> > > >> > > >> > >>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint > >>ManagerFactory > >> > task.checkpoint.system=kafka > >> > # Normally, this would be 3, but we have only one broker. > >> > task.checkpoint.replication.factor=1 > >> > > >> > # Metrics > >> > metrics.reporters=snapshot,jmx > >> > > >> > > >> > >>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metrics > >>SnapshotReporterFactory > >> > metrics.reporter.snapshot.stream=kafka.metrics > >> > > >> > > >> > >>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterF > >>actory > >> > > >> > # Serializers > >> > > >> > > >> > >>serializers.registry.string.class=org.apache.samza.serializers.StringSerd > >>eFactory > >> > > >> > > >> > >>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSn > >>apshotSerdeFactory > >> > > >> > # Kafka System > >> > > >> > > >> > >>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFact > >>ory > >> > systems.kafka.samza.msg.serde=string > >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > >> > systems.kafka.consumer.auto.offset.reset=largest > >> > systems.kafka.producer.metadata.broker.list=localhost:9092 > >> > systems.kafka.producer.producer.type=sync > >> > # Normally, we'd set this much higher, but we want things to look > >>snappy > >> in > >> > the demo. > >> > systems.kafka.producer.batch.num.messages=1 > >> > ystems.kafka.streams.metrics.samza.msg.serde=metrics > >> > > >> > But I'm getting this output, > >> > > >> > 0Caught exception in process loop. > >> > java.lang.ClassCastException: java.util.HashMap cannot be cast to > >> > java.lang.String > >> > at > >> org.apache.samza.serializers.StringSerde.toBytes(StringSerde.scala:33) > >> > at > >> org.apache.samza.serializers.SerdeManager.toBytes(SerdeManager.scala:69) > >> > at > >> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65) > >> > at > >> > > >> > > >> > >>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstanc > >>e.scala:170) > >> > at > >> > > >> > > >> > >>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstanc > >>e.scala:170) > >> > at > >> > > >> > > >> > >>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal > >>a:59) > >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> > at > >>org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170) > >> > at > >> > > >> > >>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:11 > >>6) > >> > at > >> > > >> > >>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:11 > >>6) > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> > at > >> > > >>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > >> > at org.apache.samza.container.RunLoop.send(RunLoop.scala:116) > >> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:59) > >> > at > >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504) > >> > at > >> > > >>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81) > >> > at > >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> > > >> > > >> > > >> > On Thu, Aug 7, 2014 at 3:12 PM, Telles Nobrega > >><[email protected]> > >> > wrote: > >> > > >> > > Hi Chris, I really appreciate the time you are taking to help me > >>out. > >> > > > >> > > This is job.properties file > >> > > > >> > > # Job > >> > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >> > > job.name=consumptions > >> > > > >> > > # YARN > >> > > > >> > > > >> > > >> > >>yarn.package.path=file://${basedir}/target/${project.artifactId}-${ > pom.ve > >>rsion}-dist.tar.gz > >> > > > >> > > # Task > >> > > task.class=alarm.ConsumptionProducer > >> > > task.inputs=kafka.consumptions > >> > > > >> > > > >> > > >> > >>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint > >>ManagerFactory > >> > > task.checkpoint.system=kafka > >> > > # Normally, this would be 3, but we have only one broker. > >> > > task.checkpoint.replication.factor=1 > >> > > > >> > > # Serializers > >> > > > >> > > > >> > > >> > >>serializers.registry.serde.class=org.apache.samza.serializers.StringSerde > >>Factory > >> > > > >> > > # Kafka System > >> > > > >> > > > >> > > >> > >>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFact > >>ory > >> > > *systems.kafka.samza.msg.serde=json* > >> > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > >> > > systems.kafka.consumer.auto.offset.reset=largest > >> > > systems.kafka.producer.metadata.broker.list=localhost:9092 > >> > > systems.kafka.producer.producer.type=sync > >> > > # Normally, we'd set this much higher, but we want things to look > >> snappy > >> > > in the demo. > >> > > systems.kafka.producer.batch.num.messages=1 > >> > > > >> > > *systems.kafka.streams.consumptions.key.serde=string* > >> > > *systems.kafka.streams.consumptions.msg.serde=string* > >> > > > >> > > Does this look right? > >> > > I'm running a local cluster, I want to have it running nicely > >>before I > >> > can > >> > > distribute it. > >> > > > >> > > > >> > > > >> > > On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini < > >> > > [email protected]> wrote: > >> > > > >> > >> Hey Telles, > >> > >> > >> > >> Sure. In your job.properties file, define the serde: > >> > >> > >> > >> # Serializers > >> > >> > >> > >> > >> > > >> > >>serializers.registry.serde.class=org.apache.samza.serializers.StringSerde > >>Fa > >> > >> ctory > >> > >> > >> > >> > >> > >> Then define the serde for your system: > >> > >> > >> > >> systems.kafka.samza.msg.serde=string > >> > >> > >> > >> > >> > >> Cheers, > >> > >> Chris > >> > >> > >> > >> On 8/7/14 10:54 AM, "Telles Nobrega" <[email protected]> > >>wrote: > >> > >> > >> > >> >Can you give and example on how to use string serde, i'm getting > >>an > >> > error > >> > >> >when trying to set to string > >> > >> > > >> > >> >:53:26:804Got system producers: Set(kafka) > >> > >> >17:53:26:809Got serdes: Set(string) > >> > >> >17:53:29:206Container container_1407433587782_0001_01_000017 > >>failed > >> > with > >> > >> >exit code 1 - Exception from container-launch: > >> > >> > > >> > >> > > >> > >> > > >> > >> >On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega < > >> > [email protected]> > >> > >> >wrote: > >> > >> > > >> > >> >> Thanks. > >> > >> >> > >> > >> >> > >> > >> >> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini < > >> > >> >> [email protected]> wrote: > >> > >> >> > >> > >> >>> Hey Telles, > >> > >> >>> > >> > >> >>> This is definitely a serde error. It sounds like your message > >>is > >> not > >> > >> >>> properly formatted as a JSON blob. > >> > >> >>> > >> > >> >>> If you are trying to just use a string as the message (vs. a > >>well > >> > >> >>> formatted JSON blob), then you should use the StringSerde. > >> > >> >>> > >> > >> >>> Cheers, > >> > >> >>> Chris > >> > >> >>> > >> > >> >>> On 8/7/14 8:05 AM, "Telles Nobrega" <[email protected]> > >> > wrote: > >> > >> >>> > >> > >> >>> >Hi, I'm running a simple samza topology that reads from a > >>kafka > >> > >> topic > >> > >> >>> >that > >> > >> >>> >only has two Strings > >> > >> >>> >xx:xx:xx:xxxx;xx > >> > >> >>> >And its throwing an error > >> > >> >>> > > >> > >> >>> >Caught exception in process loop. > >> > >> >>> >org.codehaus.jackson.JsonParseException: Unexpected character > >> ('F' > >> > >> >>>(code > >> > >> >>> >70)): expected a valid value (number, String, array, object, > >> > 'true', > >> > >> >>> >'false' or 'null') > >> > >> >>> > at [Source: [B@56dfb465; line: 1, column: 2] > >> > >> >>> >at > >> > >> > >> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonPars > >>>>>>er > >> > >> >>>>Min > >> > >> >>> >imalBase.java:385) > >> > >> >>> >at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar > >>>>>>(J > >> > >> >>>>son > >> > >> >>> >ParserMinimalBase.java:306) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf > >>>>>>8S > >> > >> >>>>tre > >> > >> >>> >amParser.java:1581) > >> > >> >>> >at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8 > >>>>>>St > >> > >> >>>>rea > >> > >> >>> >mParser.java:436) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser > >>>>>>.j > >> > >> >>>>ava > >> > >> >>> >:322) > >> > >> >>> >at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.ja > >>>>>>va > >> > >> >>>>:24 > >> > >> >>> >32) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.j > >>>>>>av > >> > >> >>>>a:2 > >> > >> >>> >389) > >> > >> >>> >at > >> > >> >>> > >> > >> > >> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) > >> > >> >>> > at > >> > >> > >> >>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > >> > >> >>> >at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scal > >>>>>>a: > >> > >> >>>>115 > >> > >> >>> >) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$sys > >>>>>>te > >> > >> >>>>m$S > >> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$sys > >>>>>>te > >> > >> >>>>m$S > >> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242) > >> > >> >>> >at scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> > >> >>> > at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> > >> >>> >at > >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> > >> >>> > at > >>scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> > >> >>> >at org.apache.samza.system.SystemConsumers.org > >> > >> >>> > >> > >$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply > >>>>>>(S > >> > >> >>>>yst > >> > >> >>> >emConsumers.scala:180) > >> > >> >>> >at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply > >>>>>>(S > >> > >> >>>>yst > >> > >> >>> >emConsumers.scala:180) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike > >>>>>>.s > >> > >> >>>>cal > >> > >> >>> >a:244) > >> > >> >>> >at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike > >>>>>>.s > >> > >> >>>>cal > >> > >> >>> >a:244) > >> > >> >>> > at > >>scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> > >> >>> >at > >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> > >> >>> > at > >> > scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > >> > >> >>> >at > >> > >> > >> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > >> > >> >>> > at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set. > >>>>>>sc > >> > >> >>>>ala > >> > >> >>> >:47) > >> > >> >>> >at scala.collection.SetLike$class.map(SetLike.scala:93) > >> > >> >>> > at scala.collection.AbstractSet.map(Set.scala:47) > >> > >> >>> >at > >> > >> >>> > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumers. > >>>>>>sc > >> > >> >>>>ala > >> > >> >>> >:180) > >> > >> >>> > at > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.scala > >>>>>>:4 > >> > >> >>>>4) > >> > >> >>> >at > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala: > >>>>>>20 > >> > >> >>>>8) > >> > >> >>> > at > >>org.apache.samza.container.RunLoop.process(RunLoop.scala:73) > >> > >> >>> >at org.apache.samza.container.RunLoop.run(RunLoop.scala:57) > >> > >> >>> > at > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:50 > >>>>>>4) > >> > >> >>> >at > >> > >> >>> > >> > >> > >> > >> > >> > > >> > >>>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala: > >>>>>>81 > >> > >> >>>>) > >> > >> >>> > at > >> > >> > >> >>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> > >> >>> > > >> > >> >>> > > >> > >> >>> > > >> > >> >>> >Has anyone experienced this error before? > >> > >> >>> > > >> > >> >>> >-- > >> > >> >>> >------------------------------------------ > >> > >> >>> >Telles Mota Vidal Nobrega > >> > >> >>> >M.sc. Candidate at UFCG > >> > >> >>> >B.sc. in Computer Science at UFCG > >> > >> >>> >Software Engineer at OpenStack Project - HP/LSD-UFCG > >> > >> >>> > >> > >> >>> > >> > >> >> > >> > >> >> > >> > >> >> -- > >> > >> >> ------------------------------------------ > >> > >> >> Telles Mota Vidal Nobrega > >> > >> >> M.sc. Candidate at UFCG > >> > >> >> B.sc. in Computer Science at UFCG > >> > >> >> Software Engineer at OpenStack Project - HP/LSD-UFCG > >> > >> >> > >> > >> > > >> > >> > > >> > >> > > >> > >> >-- > >> > >> >------------------------------------------ > >> > >> >Telles Mota Vidal Nobrega > >> > >> >M.sc. Candidate at UFCG > >> > >> >B.sc. in Computer Science at UFCG > >> > >> >Software Engineer at OpenStack Project - HP/LSD-UFCG > >> > >> > >> > >> > >> > > > >> > > > >> > > -- > >> > > ------------------------------------------ > >> > > Telles Mota Vidal Nobrega > >> > > M.sc. Candidate at UFCG > >> > > B.sc. in Computer Science at UFCG > >> > > Software Engineer at OpenStack Project - HP/LSD-UFCG > >> > > > >> > > >> > > >> > > >> > -- > >> > ------------------------------------------ > >> > Telles Mota Vidal Nobrega > >> > M.sc. Candidate at UFCG > >> > B.sc. in Computer Science at UFCG > >> > Software Engineer at OpenStack Project - HP/LSD-UFCG > >> > > >> > > > > > > > >-- > >------------------------------------------ > >Telles Mota Vidal Nobrega > >M.sc. Candidate at UFCG > >B.sc. in Computer Science at UFCG > >Software Engineer at OpenStack Project - HP/LSD-UFCG > > -- ------------------------------------------ Telles Mota Vidal Nobrega M.sc. Candidate at UFCG B.sc. in Computer Science at UFCG Software Engineer at OpenStack Project - HP/LSD-UFCG
