Hum, that sounds like a perfect reason for it. I'm writing to the topic with this code
https://github.com/tellesnobrega/kafka-producer/blob/master/src/Produce.java My problem is that I need to send numbers as key and value to the kafka topic so i can read it in samza. What is the best way to de/serialize this? On Thu, Aug 7, 2014 at 3:43 PM, Yan Fang <[email protected]> wrote: > Hi Telles, > > One of the possible reasons is that, in your process method, you are trying > to send a HashMap, not a String, in the collection.send. Could you check it > ? > > Thanks, > > Fang, Yan > [email protected] > +1 (206) 849-4108 > > > On Thu, Aug 7, 2014 at 11:25 AM, Telles Nobrega <[email protected]> > wrote: > > > I changed my properties a little to look like this: link > > < > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201311.mbox/%[email protected]%3E > > > > > > > here it goes: > > > > # Job > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > job.name=consumptions > > > > # YARN > > > > > yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > > > # Task > > task.class=alarm.ConsumptionProducer > > task.inputs=kafka.consumptions > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > task.checkpoint.system=kafka > > # Normally, this would be 3, but we have only one broker. > > task.checkpoint.replication.factor=1 > > > > # Metrics > > metrics.reporters=snapshot,jmx > > > > > metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory > > metrics.reporter.snapshot.stream=kafka.metrics > > > > > metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory > > > > # Serializers > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > > > > serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory > > > > # Kafka System > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > systems.kafka.samza.msg.serde=string > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > > systems.kafka.consumer.auto.offset.reset=largest > > systems.kafka.producer.metadata.broker.list=localhost:9092 > > systems.kafka.producer.producer.type=sync > > # Normally, we'd set this much higher, but we want things to look snappy > in > > the demo. > > systems.kafka.producer.batch.num.messages=1 > > ystems.kafka.streams.metrics.samza.msg.serde=metrics > > > > But I'm getting this output, > > > > 0Caught exception in process loop. > > java.lang.ClassCastException: java.util.HashMap cannot be cast to > > java.lang.String > > at > org.apache.samza.serializers.StringSerde.toBytes(StringSerde.scala:33) > > at > org.apache.samza.serializers.SerdeManager.toBytes(SerdeManager.scala:69) > > at > org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65) > > at > > > > > org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstance.scala:170) > > at > > > > > org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstance.scala:170) > > at > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170) > > at > > > org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:116) > > at > > > org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:116) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > > at org.apache.samza.container.RunLoop.send(RunLoop.scala:116) > > at org.apache.samza.container.RunLoop.run(RunLoop.scala:59) > > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504) > > at > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81) > > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > > > > > > > On Thu, Aug 7, 2014 at 3:12 PM, Telles Nobrega <[email protected]> > > wrote: > > > > > Hi Chris, I really appreciate the time you are taking to help me out. > > > > > > This is job.properties file > > > > > > # Job > > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > > job.name=consumptions > > > > > > # YARN > > > > > > > > > yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > > > > > # Task > > > task.class=alarm.ConsumptionProducer > > > task.inputs=kafka.consumptions > > > > > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > > task.checkpoint.system=kafka > > > # Normally, this would be 3, but we have only one broker. > > > task.checkpoint.replication.factor=1 > > > > > > # Serializers > > > > > > > > > serializers.registry.serde.class=org.apache.samza.serializers.StringSerdeFactory > > > > > > # Kafka System > > > > > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > > *systems.kafka.samza.msg.serde=json* > > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > > > systems.kafka.consumer.auto.offset.reset=largest > > > systems.kafka.producer.metadata.broker.list=localhost:9092 > > > systems.kafka.producer.producer.type=sync > > > # Normally, we'd set this much higher, but we want things to look > snappy > > > in the demo. > > > systems.kafka.producer.batch.num.messages=1 > > > > > > *systems.kafka.streams.consumptions.key.serde=string* > > > *systems.kafka.streams.consumptions.msg.serde=string* > > > > > > Does this look right? > > > I'm running a local cluster, I want to have it running nicely before I > > can > > > distribute it. > > > > > > > > > > > > On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini < > > > [email protected]> wrote: > > > > > >> Hey Telles, > > >> > > >> Sure. In your job.properties file, define the serde: > > >> > > >> # Serializers > > >> > > >> > > > serializers.registry.serde.class=org.apache.samza.serializers.StringSerdeFa > > >> ctory > > >> > > >> > > >> Then define the serde for your system: > > >> > > >> systems.kafka.samza.msg.serde=string > > >> > > >> > > >> Cheers, > > >> Chris > > >> > > >> On 8/7/14 10:54 AM, "Telles Nobrega" <[email protected]> wrote: > > >> > > >> >Can you give and example on how to use string serde, i'm getting an > > error > > >> >when trying to set to string > > >> > > > >> >:53:26:804Got system producers: Set(kafka) > > >> >17:53:26:809Got serdes: Set(string) > > >> >17:53:29:206Container container_1407433587782_0001_01_000017 failed > > with > > >> >exit code 1 - Exception from container-launch: > > >> > > > >> > > > >> > > > >> >On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega < > > [email protected]> > > >> >wrote: > > >> > > > >> >> Thanks. > > >> >> > > >> >> > > >> >> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini < > > >> >> [email protected]> wrote: > > >> >> > > >> >>> Hey Telles, > > >> >>> > > >> >>> This is definitely a serde error. It sounds like your message is > not > > >> >>> properly formatted as a JSON blob. > > >> >>> > > >> >>> If you are trying to just use a string as the message (vs. a well > > >> >>> formatted JSON blob), then you should use the StringSerde. > > >> >>> > > >> >>> Cheers, > > >> >>> Chris > > >> >>> > > >> >>> On 8/7/14 8:05 AM, "Telles Nobrega" <[email protected]> > > wrote: > > >> >>> > > >> >>> >Hi, I'm running a simple samza topology that reads from a kafka > > >> topic > > >> >>> >that > > >> >>> >only has two Strings > > >> >>> >xx:xx:xx:xxxx;xx > > >> >>> >And its throwing an error > > >> >>> > > > >> >>> >Caught exception in process loop. > > >> >>> >org.codehaus.jackson.JsonParseException: Unexpected character > ('F' > > >> >>>(code > > >> >>> >70)): expected a valid value (number, String, array, object, > > 'true', > > >> >>> >'false' or 'null') > > >> >>> > at [Source: [B@56dfb465; line: 1, column: 2] > > >> >>> >at > > >> > >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParser > > >> >>>>Min > > >> >>> >imalBase.java:385) > > >> >>> >at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(J > > >> >>>>son > > >> >>> >ParserMinimalBase.java:306) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8S > > >> >>>>tre > > >> >>> >amParser.java:1581) > > >> >>> >at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8St > > >> >>>>rea > > >> >>> >mParser.java:436) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.j > > >> >>>>ava > > >> >>> >:322) > > >> >>> >at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java > > >> >>>>:24 > > >> >>> >32) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.jav > > >> >>>>a:2 > > >> >>> >389) > > >> >>> >at > > >> >>> > > >> > org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) > > >> >>> > at > > >> > >>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > > >> >>> >at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala: > > >> >>>>115 > > >> >>> >) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$syste > > >> >>>>m$S > > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$syste > > >> >>>>m$S > > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242) > > >> >>> >at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > >> >>> > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > >> >>> >at > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > >> >>> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > >> >>> >at org.apache.samza.system.SystemConsumers.org > > >> >>> > > >$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(S > > >> >>>>yst > > >> >>> >emConsumers.scala:180) > > >> >>> >at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(S > > >> >>>>yst > > >> >>> >emConsumers.scala:180) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.s > > >> >>>>cal > > >> >>> >a:244) > > >> >>> >at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.s > > >> >>>>cal > > >> >>> >a:244) > > >> >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > >> >>> >at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > >> >>> > at > > scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > > >> >>> >at > > >> > >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > >> >>> > at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.sc > > >> >>>>ala > > >> >>> >:47) > > >> >>> >at scala.collection.SetLike$class.map(SetLike.scala:93) > > >> >>> > at scala.collection.AbstractSet.map(Set.scala:47) > > >> >>> >at > > >> >>> > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumers.sc > > >> >>>>ala > > >> >>> >:180) > > >> >>> > at > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.scala:4 > > >> >>>>4) > > >> >>> >at > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:20 > > >> >>>>8) > > >> >>> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:73) > > >> >>> >at org.apache.samza.container.RunLoop.run(RunLoop.scala:57) > > >> >>> > at > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504) > > >> >>> >at > > >> >>> > > >> > > >> > > > >>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81 > > >> >>>>) > > >> >>> > at > > >> > >>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > >> >>> > > > >> >>> > > > >> >>> > > > >> >>> >Has anyone experienced this error before? > > >> >>> > > > >> >>> >-- > > >> >>> >------------------------------------------ > > >> >>> >Telles Mota Vidal Nobrega > > >> >>> >M.sc. Candidate at UFCG > > >> >>> >B.sc. in Computer Science at UFCG > > >> >>> >Software Engineer at OpenStack Project - HP/LSD-UFCG > > >> >>> > > >> >>> > > >> >> > > >> >> > > >> >> -- > > >> >> ------------------------------------------ > > >> >> Telles Mota Vidal Nobrega > > >> >> M.sc. Candidate at UFCG > > >> >> B.sc. in Computer Science at UFCG > > >> >> Software Engineer at OpenStack Project - HP/LSD-UFCG > > >> >> > > >> > > > >> > > > >> > > > >> >-- > > >> >------------------------------------------ > > >> >Telles Mota Vidal Nobrega > > >> >M.sc. Candidate at UFCG > > >> >B.sc. in Computer Science at UFCG > > >> >Software Engineer at OpenStack Project - HP/LSD-UFCG > > >> > > >> > > > > > > > > > -- > > > ------------------------------------------ > > > Telles Mota Vidal Nobrega > > > M.sc. Candidate at UFCG > > > B.sc. in Computer Science at UFCG > > > Software Engineer at OpenStack Project - HP/LSD-UFCG > > > > > > > > > > > -- > > ------------------------------------------ > > Telles Mota Vidal Nobrega > > M.sc. Candidate at UFCG > > B.sc. in Computer Science at UFCG > > Software Engineer at OpenStack Project - HP/LSD-UFCG > > > -- ------------------------------------------ Telles Mota Vidal Nobrega M.sc. Candidate at UFCG B.sc. in Computer Science at UFCG Software Engineer at OpenStack Project - HP/LSD-UFCG
