Hey Michael, Hmm. I checked the SystemConsumers code, and nothing jumped out at me as broken. Could you paste your logs somewhere (pastebin/gist) with DEBUG-level logging enabled?
Cheers, Chris On Thu, Mar 19, 2015 at 6:52 AM, Michael Strobl < michael.str...@gameforge.com> wrote: > Hey there, > > I used the config setting "task.drop.deserialization.errors=true" to drop > all invalid JSON messages. It seems to work fine, in case of an ongoing > message stream. But the system will stuck when the last message was invalid > and the queue will be empty. Upcoming messages won't be processed until I > kill the task and start it again - then the new messages will be processed. > > > Works fine in case of: > > VALID MESSAGE > VALID MESSAGE > INVALID MESSAGE > VALID MESSAGE > VALID MESSAGE > > > But stuck when the last message was invalid: > > VALID MESSAGE > VALID MESSAGE > INVALID MESSAGE > ... (some time later)... > VALID MESSAGE <stuck> > VALID MESSAGE <stuck> > > > Can you please verify this behavior - I am not sure if the system stucks, > because the method "update" of the SystemConsumers class will return > "false" in case of an empty queue and the last message produced a serde > exception: > > https://github.com/apache/samza/blob/0.8.1/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > > Thanks for helping! > > Cheers, > Michael Strobl >