[
https://issues.apache.org/jira/browse/SAMZA-608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14370300#comment-14370300
]
Chris Riccomini commented on SAMZA-608:
---------------------------------------
I believe I understand the issue now. The problem is that we assume that
update() will always add a message to the MessageChooser here:
{code}
if
(emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition))
{
update(systemStreamPartition)
}
{code}
This was a valid assumption before we started dropping deserialization
exceptions. Now, if deserialization exceptions are ignored, we remove the SSP
from {{emptySystemStreamPartitionsBySystem}}, and call update. The
MessageChooser never receives the message it should have (because of a
deserialization exception). Once this assumption breaks, the
SystemConsumers.choose method will never choose this SSP again, which means the
SSP queue will never be drained. If it's never drained, it'll never get added
to {{emptySystemStreamPartitionsBySystem}} again.
Because this code is so critical, we should be careful about how we fix this. I
*think* we should just be able to add an if() to the update() call to check if
update returned false, and add it back to
{{emptySystemStreamPartitionsBySystem}}. Returning false here means that we
gave the update method a queue with messages, but it was unable to add any to
the MessageChooser. If that's the case, the queue must be empty ({{ while
(q.size > 0 && !updated)}}), so it should be added back to
{{emptySystemStreamPartitionsBySystem}}.
> Deserialization error causes SystemConsumers to hang
> ----------------------------------------------------
>
> Key: SAMZA-608
> URL: https://issues.apache.org/jira/browse/SAMZA-608
> Project: Samza
> Issue Type: Task
> Components: container
> Affects Versions: 0.9.0
> Reporter: Chris Riccomini
> Fix For: 0.10.0
>
> Attachments: SAMZA-608-hello-samza.diff
>
>
> SamzaContainers seem to wedge if malformed messages are sent to it, even if
> {{task.drop.deserialization.errors=true}}. This was initially raised on the
> mailing list
> [here|http://mail-archives.apache.org/mod_mbox/samza-dev/201503.mbox/%3C94CF0C2E22F96641A963FB3B6D2911213A2715B2%40GF-KA-EX01.gf.local%3E].
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)