Hey there,

I used the config setting "task.drop.deserialization.errors=true" to drop all 
invalid JSON messages. It seems to work fine, in case of an ongoing message 
stream. But the system will stuck when the last message was invalid and the 
queue will be empty. Upcoming messages won't be processed until I kill the task 
and start it again - then the new messages will be processed.


Works fine in case of:

VALID MESSAGE
VALID MESSAGE
INVALID MESSAGE
VALID MESSAGE
VALID MESSAGE


But stuck when the last message was invalid:

VALID MESSAGE
VALID MESSAGE
INVALID MESSAGE
... (some time later)...
VALID MESSAGE <stuck>
VALID MESSAGE <stuck>


Can you please verify this behavior - I am not sure if the system stucks, 
because the method "update" of the SystemConsumers class will return "false" in 
case of an empty queue and the last message produced a serde exception:
https://github.com/apache/samza/blob/0.8.1/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala

Thanks for helping!

Cheers,
Michael Strobl

Reply via email to