Hi,
I am using json serde to receive json strings from kafka and
automatically convert it to LinkedHashMap. I am using the following
code fragment to achieve this:
LinkedHashMap<String, Object> rawEvent = (LinkedHashMap<String,
Object>) envelope.getMessage();
I am now getting an exception when samsa encounters a bad json string
from kafka.
16/11/22 11:33:14 ERROR container.SamzaContainer: Caught exception in
process loop.
org.apache.samza.system.SystemConsumersException: Cannot deserialize
an incoming message.
at
org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:303)
at
org.apache.samza.system.SystemConsumers.tryUpdate(SystemConsumers.scala:270)
.
.
.
.
Caused by: org.codehaus.jackson.JsonParseException: Invalid UTF-8
middle byte 0x6e
at [Source: [B@7ce97ee5; line: 1, column: 811]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
at
org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
at
org.codehaus.jackson.impl.Utf8StreamParser._reportInvalidOther(Utf8StreamParser.java:2244)
.
.
.
.
Since the exceptions seems to be raised in scala code, how best to
catch such exceptions and deal with it?
Thanks and Regards,
Raj