Hi, Chris, What's your configuration of your job? And what's your customized serde class? It seems that the SerdeManager picked up JsonSerde in the deserialization process. Are you expecting a different serde class to be used for this topic?
Thanks! -Yi On Wed, Sep 23, 2015 at 12:33 PM, Chris Bonnell <[email protected]> wrote: > We're decoding avro files as part of our samza ecosystem, and we came > across an error that has been a bit of a show-stopper. We have a kafka > topic of avro-encoded events, and a custom serde which does work. > > However, we were consuming from this stream with the serde in place and > encountered the following error: > > org.apache.samza.system.SystemConsumersException: Cannot deserialize > an incoming message. > at > org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293) > at org.apache.samza.system.SystemConsumers.org > $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) > at > org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276) > at > org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) > at scala.collection.SetLike$class.map(SetLike.scala:93) > at scala.collection.AbstractSet.map(Set.scala:47) > at > org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276) > at > org.apache.samza.system.SystemConsumers.start(SystemConsumers.scala:163) > at > org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:626) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:546) > at > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93) > at > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67) > at > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > Caused by: org.codehaus.jackson.JsonParseException: Unexpected > character (':' (code 58)): expected a valid value (number, String, > array, object, 'true', 'false' or 'null') > at [Source: [B@28b93ffb; line: 1, column: 2] > at > org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > at > org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385) > at > org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306) > at > org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8StreamParser.java:1582) > at > org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8StreamParser.java:437) > at > org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:323) > at > org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2432) > at > org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2389) > at > org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) > at > org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > at > org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:115) > at > org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290) > ... 19 more > > This error indicates that our serde (and its error-catching behaviors) were > being totally circumvented but nothing in our code supports that this > should be the case, and having this error is enough to stop the samza task, > leaving us with the options to either have the error perpetually or lose > data by moving the offset to current. > > Any idea how or why this could happen? > > Thanks, > Chris >
