A counterpoint thought: should a serialization/deserialization error be treated
the same as an exception raised in a task's `process` method? A dead-letter
queue could be a useful mechanism for dealing with all message processing
failures, not just deserialization errors. In that case, the first way
(configured options for handling the exception) would give a nice symmetry
between different kinds of errors.
The second way could achieve the same thing, but would require some boilerplate
code along the lines of:
public void process(...) {
if (message not deserialized correctly) {
deadLetter.emit(message);
} else {
try {
doStuff(message);
} catch (Exception e) {
deadLetter.emit(message, e);
}
}
}
On 4 Feb 2014, at 17:46, Chris Riccomini <[email protected]> wrote:
> Hey Danny,
>
> I can think of two ways to accomplish this.
>
> The first way is essentially what you've described. Allow the framework to
> have a pre-defined set of options defined via config (drop the message,
> re-route to another topic, etc).
>
> The second way is to catch the serialization issues, and still pass the
> failed message to the StreamTask. The way in which the StreamTask would be
> notified of the failure is up for debate. One option would be to have a
> ErrorTask interface, which has an onDeserializationError call back. No new
> configuration would be required in this case--simply implementing the
> ErrorTask means you get notified of any serialization errors (rather than
> failing the container, which would be the default). Another option would
> be to have the IncomingMessageEnvelope have an error flag, which we could
> use to denote the serialization failure.
>
> I like the second approach because it's more generic. Instead of
> pre-defining exact behavior when a failure occurs, it seems more flexible
> to let the StreamTask know, and let the developer decide what the
> appropriate action is. I hadn't even thought of the re-route case you
> brought up, and I'm sure there are many other possible actions that we're
> not thinking of right now. Of the second approach's potential
> implementation options, I favor the ErrorTask approach right now, but I
> haven't dug into it too much.
>
> Regardless of which way we choose, I think the default should be to fail
> the container. This is the safest behavior, as it means there will be no
> data loss, and developers will be alerted of the serialization error.
>
> As far as implementation goes, The four classes that are probably of most
> interest to you are SamzaContainer, TaskInstance, StreamConsumers, and
> SerdeManager. You'll need to catch the serde exceptions somewhere in the
> StreamConsumer or SerdeManager class, and implement your new logic there.
> I think the best approach is to read over these four classes, and ask us
> any questions you might have.
>
> Cheers,
> Chris
>
> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote:
>
>> We have discussed this, and it is something that we want to look into.
>>
>> Do you have any thoughts on how to implement this feature?
>> I assume you would want the failure behavior to be configurable.
>>
>> Like
>> Drop the message,
>> Send a message to a new queue, and drop.
>> Fail the container (is that ever appropriate?)
>> Anything else?
>>
>> I am not familiar with this code base.
>> Do you have a suggestion on what classes I should be looking to modify?
>>
>> Is there someone who I should bounce ideas off of?
>>
>>
>> Thanks
>>
>>
>> Danny
>>
>>
>>
>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote:
>>
>>> It's not intentional, error handling just hasn't been added yet. If
>>> you're
>>> interested, we'd love to have the contribution. In particular, take a
>>> look
>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which also
>>> touches on how serdes should handle error conditions.
>>>
>>> Thanks,
>>> Jakob
>>>
>>>
>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>>> <[email protected]>wrote:
>>>
>>>> I am currently using the JsonSerde/JsonSerdeFactory classes for
>>> serializing
>>>> kafka messages.
>>>>
>>>> I have noticed that if there is bad json input coming in through
>>> kafka,
>>> the
>>>> samza container seems to crash.
>>>>
>>>> I was looking at JsonSerde.scala, which does not seem to have any
>>> error
>>>> handling.
>>>>
>>>> So I was curious if this was intentional?
>>>> Or if there is a different way to handle these types of input errors?
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>> Danny
>>>>
>>>
>