A counterpoint thought: should a serialization/deserialization error be treated 
the same as an exception raised in a task's `process` method? A dead-letter 
queue could be a useful mechanism for dealing with all message processing 
failures, not just deserialization errors. In that case, the first way 
(configured options for handling the exception) would give a nice symmetry 
between different kinds of errors.

The second way could achieve the same thing, but would require some boilerplate 
code along the lines of:

public void process(...) {
  if (message not deserialized correctly) {
    deadLetter.emit(message);
  } else {
    try {
      doStuff(message);
    } catch (Exception e) {
      deadLetter.emit(message, e);
    }
  }
}


On 4 Feb 2014, at 17:46, Chris Riccomini <[email protected]> wrote:
> Hey Danny,
> 
> I can think of two ways to accomplish this.
> 
> The first way is essentially what you've described. Allow the framework to
> have a pre-defined set of options defined via config (drop the message,
> re-route to another topic, etc).
> 
> The second way is to catch the serialization issues, and still pass the
> failed message to the StreamTask. The way in which the StreamTask would be
> notified of the failure is up for debate. One option would be to have a
> ErrorTask interface, which has an onDeserializationError call back. No new
> configuration would be required in this case--simply implementing the
> ErrorTask means you get notified of any serialization errors (rather than
> failing the container, which would be the default). Another option would
> be to have the IncomingMessageEnvelope have an error flag, which we could
> use to denote the serialization failure.
> 
> I like the second approach because it's more generic. Instead of
> pre-defining exact behavior when a failure occurs, it seems more flexible
> to let the StreamTask know, and let the developer decide what the
> appropriate action is. I hadn't even thought of the re-route case you
> brought up, and I'm sure there are many other possible actions that we're
> not thinking of right now. Of the second approach's potential
> implementation options, I favor the ErrorTask approach right now, but I
> haven't dug into it too much.
> 
> Regardless of which way we choose, I think the default should be to fail
> the container. This is the safest behavior, as it means there will be no
> data loss, and developers will be alerted of the serialization error.
> 
> As far as implementation goes, The four classes that are probably of most
> interest to you are SamzaContainer, TaskInstance, StreamConsumers, and
> SerdeManager. You'll need to catch the serde exceptions somewhere in the
> StreamConsumer or SerdeManager class, and implement your new logic there.
> I think the best approach is to read over these four classes, and ask us
> any questions you might have.
> 
> Cheers,
> Chris
> 
> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote:
> 
>> We have discussed this, and it is something that we want to look into.
>> 
>> Do you have any thoughts on how to implement this feature?
>> I assume you would want the failure behavior to be configurable.
>> 
>> Like
>> Drop the message,
>> Send a message to a new queue, and drop.
>> Fail the container (is that ever appropriate?)
>> Anything else?
>> 
>> I am not familiar with this code base.
>> Do you have a suggestion on what classes I should be looking to modify?
>> 
>> Is there someone who I should bounce ideas off of?
>> 
>> 
>> Thanks
>> 
>> 
>> Danny
>> 
>> 
>> 
>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote:
>> 
>>> It's not intentional, error handling just hasn't been added yet.  If
>>> you're
>>> interested, we'd love to have the contribution.  In particular, take a
>>> look
>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which also
>>> touches on how serdes should handle error conditions.
>>> 
>>> Thanks,
>>> Jakob
>>> 
>>> 
>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>>> <[email protected]>wrote:
>>> 
>>>> I am currently using the JsonSerde/JsonSerdeFactory classes for
>>> serializing
>>>> kafka messages.
>>>> 
>>>> I have noticed that if there is bad json input coming in through
>>> kafka,
>>> the
>>>> samza container seems to crash.
>>>> 
>>>> I was looking at JsonSerde.scala, which does not seem to have any
>>> error
>>>> handling.
>>>> 
>>>> So I was curious if this was intentional?
>>>> Or if there is a different way to handle these types of input errors?
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>> Danny
>>>> 
>>> 
> 

Reply via email to