Hi Chris, I really appreciate the time you are taking to help me out.
This is job.properties file
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=consumptions
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
task.class=alarm.ConsumptionProducer
task.inputs=kafka.consumptions
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
# Serializers
serializers.registry.serde.class=org.apache.samza.serializers.StringSerdeFactory
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
*systems.kafka.samza.msg.serde=json*
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.metadata.broker.list=localhost:9092
systems.kafka.producer.producer.type=sync
# Normally, we'd set this much higher, but we want things to look snappy in
the demo.
systems.kafka.producer.batch.num.messages=1
*systems.kafka.streams.consumptions.key.serde=string*
*systems.kafka.streams.consumptions.msg.serde=string*
Does this look right?
I'm running a local cluster, I want to have it running nicely before I can
distribute it.
On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini <
[email protected]> wrote:
> Hey Telles,
>
> Sure. In your job.properties file, define the serde:
>
> # Serializers
> serializers.registry.serde.class=org.apache.samza.serializers.StringSerdeFa
> ctory
>
>
> Then define the serde for your system:
>
> systems.kafka.samza.msg.serde=string
>
>
> Cheers,
> Chris
>
> On 8/7/14 10:54 AM, "Telles Nobrega" <[email protected]> wrote:
>
> >Can you give and example on how to use string serde, i'm getting an error
> >when trying to set to string
> >
> >:53:26:804Got system producers: Set(kafka)
> >17:53:26:809Got serdes: Set(string)
> >17:53:29:206Container container_1407433587782_0001_01_000017 failed with
> >exit code 1 - Exception from container-launch:
> >
> >
> >
> >On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega <[email protected]>
> >wrote:
> >
> >> Thanks.
> >>
> >>
> >> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini <
> >> [email protected]> wrote:
> >>
> >>> Hey Telles,
> >>>
> >>> This is definitely a serde error. It sounds like your message is not
> >>> properly formatted as a JSON blob.
> >>>
> >>> If you are trying to just use a string as the message (vs. a well
> >>> formatted JSON blob), then you should use the StringSerde.
> >>>
> >>> Cheers,
> >>> Chris
> >>>
> >>> On 8/7/14 8:05 AM, "Telles Nobrega" <[email protected]> wrote:
> >>>
> >>> >Hi, I'm running a simple samza topology that reads from a kafka topic
> >>> >that
> >>> >only has two Strings
> >>> >xx:xx:xx:xxxx;xx
> >>> >And its throwing an error
> >>> >
> >>> >Caught exception in process loop.
> >>> >org.codehaus.jackson.JsonParseException: Unexpected character ('F'
> >>>(code
> >>> >70)): expected a valid value (number, String, array, object, 'true',
> >>> >'false' or 'null')
> >>> > at [Source: [B@56dfb465; line: 1, column: 2]
> >>> >at
> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
> >>> > at
> >>>
> >>>
> >>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParser
> >>>>Min
> >>> >imalBase.java:385)
> >>> >at
> >>>
> >>>
> >>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(J
> >>>>son
> >>> >ParserMinimalBase.java:306)
> >>> > at
> >>>
> >>>
> >>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8S
> >>>>tre
> >>> >amParser.java:1581)
> >>> >at
> >>>
> >>>
> >>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8St
> >>>>rea
> >>> >mParser.java:436)
> >>> > at
> >>>
> >>>
> >>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.j
> >>>>ava
> >>> >:322)
> >>> >at
> >>>
> >>>
> >>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java
> >>>>:24
> >>> >32)
> >>> > at
> >>>
> >>>
> >>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.jav
> >>>>a:2
> >>> >389)
> >>> >at
> >>> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
> >>> > at
> >>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
> >>> >at
> >>>
> >>>
> >>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:
> >>>>115
> >>> >)
> >>> > at
> >>>
> >>>
> >>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$syste
> >>>>m$S
> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245)
> >>> > at
> >>>
> >>>
> >>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$syste
> >>>>m$S
> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242)
> >>> >at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>> >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>> >at org.apache.samza.system.SystemConsumers.org
> >>> >$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242)
> >>> > at
> >>>
> >>>
> >>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(S
> >>>>yst
> >>> >emConsumers.scala:180)
> >>> >at
> >>>
> >>>
> >>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(S
> >>>>yst
> >>> >emConsumers.scala:180)
> >>> > at
> >>>
> >>>
> >>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.s
> >>>>cal
> >>> >a:244)
> >>> >at
> >>>
> >>>
> >>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.s
> >>>>cal
> >>> >a:244)
> >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>> >at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>> > at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
> >>> >at
> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >>> > at
> >>>
> >>>
> >>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.sc
> >>>>ala
> >>> >:47)
> >>> >at scala.collection.SetLike$class.map(SetLike.scala:93)
> >>> > at scala.collection.AbstractSet.map(Set.scala:47)
> >>> >at
> >>>
> >>>
> >>>>org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumers.sc
> >>>>ala
> >>> >:180)
> >>> > at
> >>>
> >>>>org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.scala:4
> >>>>4)
> >>> >at
> >>>
> >>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:20
> >>>>8)
> >>> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:73)
> >>> >at org.apache.samza.container.RunLoop.run(RunLoop.scala:57)
> >>> > at
> >>>
> >>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
> >>> >at
> >>>
> >>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81
> >>>>)
> >>> > at
> >>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >>> >
> >>> >
> >>> >
> >>> >Has anyone experienced this error before?
> >>> >
> >>> >--
> >>> >------------------------------------------
> >>> >Telles Mota Vidal Nobrega
> >>> >M.sc. Candidate at UFCG
> >>> >B.sc. in Computer Science at UFCG
> >>> >Software Engineer at OpenStack Project - HP/LSD-UFCG
> >>>
> >>>
> >>
> >>
> >> --
> >> ------------------------------------------
> >> Telles Mota Vidal Nobrega
> >> M.sc. Candidate at UFCG
> >> B.sc. in Computer Science at UFCG
> >> Software Engineer at OpenStack Project - HP/LSD-UFCG
> >>
> >
> >
> >
> >--
> >------------------------------------------
> >Telles Mota Vidal Nobrega
> >M.sc. Candidate at UFCG
> >B.sc. in Computer Science at UFCG
> >Software Engineer at OpenStack Project - HP/LSD-UFCG
>
>
--
------------------------------------------
Telles Mota Vidal Nobrega
M.sc. Candidate at UFCG
B.sc. in Computer Science at UFCG
Software Engineer at OpenStack Project - HP/LSD-UFCG