So it sounds like this would just be a try catch around this line in SystemConsumers.scala
unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope)) And just do some logging in the catch statement? On Wed, Feb 5, 2014 at 12:32 PM, Chris Riccomini <[email protected]>wrote: > Hey Guys, > > Thinking on this more. Given that we don't have a good grasp on this, and > it seems fairly complicated for now, I'm proposing that we hold off on all > this complex work, and just support the ability to drop messages for now. > This should be pretty straightforward to implement, and is generally > pretty useful, and not too invasive. > > Does that sound cool with folks? Danny? > > Cheers, > Chris > > On 2/5/14 11:43 AM, "Chris Riccomini" <[email protected]> wrote: > > >Hey Martin, > > > >I agree with you about ordering. It would be ideal to give the error() > >callback exactly when we would normally call process() on the message, but > >can't because of the serde error. I think this is do-able if we tweak > >things in SystemConsumers a bit. > > > >The problem I'm having trouble resolving is how to handle the > >MessageChooser. The MessageChooser chooses the processing order between > >partitions ("I have a message from stream/partition X, and > >stream/partition Y, which one do I process next?"). These choosers > >occasionally look at the actual message payload (e.g. The timestamp field > >of an incoming message) when doing the choosing. The chooser won't know > >how to handle an IncomingMessageEnvelope where the key and value are both > >byte[], when it's expecting Avro, ProtoBuf, JSON, etc. One solution I can > >think of would be to add an error() call to the MessageChooser interface. > >The other solution would be to just give the MessageChooser everything, > >and in cases where it is looking at the message payload, it just has to be > >careful. I think there might be other solutions here as well. > > > >More thought required. > > > >Cheers, > >Chris > > > >On 2/4/14 4:13 PM, "Martin Kleppmann" <[email protected]> wrote: > > > >>Ok -- I had originally misunderstood your ErrorTask proposal. The way you > >>described it looks good to me. I agree that a single error method, which > >>takes a generic error envelope object, is better than lots of different > >>methods. > >> > >>Rather than > >> > >>if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode()) > >> > >>could we just say this? > >> > >>if (envelope.getError() instanceof DeserializationError) > >> > >>Regarding ordering: my instinctive reaction is that it would be least > >>surprising if the error is received at the same point in the message > >>sequence as the message would have been received, had it not been an > >>error. But that may need some more thinking. > >> > >>Cheers, > >>Martin > >> > >>On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]> > wrote: > >>> Hey Guys, > >>> > >>> One other thing to consider here, that didn't strike me at first is how > >>>to > >>> handle ordering when a serialization error occurs. > >>> > >>> Right now, the SystemConsumers class reads messages in, buffers them, > >>>and > >>> feeds them slowly to the MessageChooser. The MessageChooser takes only > >>> IncomingMessageEnvelope right now. So the question is, if we have a > >>>serde > >>> error, when does the StreamTask see it? It shouldn't see it before > >>>other > >>> messages for the same stream partition, since this would mean we've > >>>broken > >>> the ordering of the messages. Arguably, the MessageChooser should also > >>>get > >>> a chance to see it, to choose in what order to process it. > >>> > >>> One could make the argument, though, that a message that can't be > >>> deserialized is invalid, and you're looking at potential data loss > >>>anyway, > >>> so ordering doesn't matter. I'm not sure if this assumption is true in > >>>all > >>> cases though. > >>> > >>> I don't really have a good answer for this at the moment. I think we > >>> should think about it. > >>> > >>> Cheers, > >>> Chris > >>> > >>> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]> > >>>wrote: > >>> > >>>> OK Great, that looks good. > >>>> > >>>> I will look into this as I have time and get back to you with > >>>>questions. > >>>> > >>>> > >>>> Thanks for the help. > >>>> > >>>> > >>>> Danny > >>>> > >>>> > >>>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini > >>>> <[email protected]>wrote: > >>>> > >>>>> Hey Guys, > >>>>> > >>>>> Here's a pseudo-code proposal for an error API. > >>>>> > >>>>> interface ErrorTask { > >>>>> public void error(ErrorEnvelope envelope, MessageCollector > >>>>>collector, > >>>>> TaskCoordinator coordinator); > >>>>> } > >>>>> > >>>>> interface ErrorEnvelope { > >>>>> ErrorCode getErrorCode(); > >>>>> Object getError(); > >>>>> } > >>>>> > >>>>> class DeserializationError { > >>>>> byte[] getKey(); > >>>>> > >>>>> byte[] getMessage(); > >>>>> boolean keyFailed(); > >>>>> boolean valueFailed(); > >>>>> } > >>>>> > >>>>> Then you could implement something like: > >>>>> > >>>>> class MyStreamTask implements StreamTask, ErrorTask { > >>>>> public void proces(Š) { > >>>>> // do stuff > >>>>> } > >>>>> > >>>>> public void error(ErrorEnvelope envelope, MessageCollector > >>>>>collector, > >>>>> TaskCoordinator coordinator) { > >>>>> if > >>>>>(DeserializationError.ERROR_CODE.equals(envelope.getErrorCode()) > >>>>> { > >>>>> DeserializationError error = (DeserializationError) > >>>>> envelope.getError(); > >>>>> collector.send("kafka", "bad-data", error.getKey(), > >>>>> error.getValue()); > >>>>> } > >>>>> } > >>>>> } > >>>>> > >>>>> Arguably, we've just moved our big if-statement block from the > >>>>>process() > >>>>> method into the ErrorTask.error method, but the alternative, as I see > >>>>> it, > >>>>> is to have on call back per error message. This moves us into a state > >>>>> where every time we want to add a new callback, we break all existing > >>>>> implementations (since they must now implement the new method, as > >>>>> well). I > >>>>> chose the single callback and generic object approach because it > >>>>>matches > >>>>> what our process method does when multiple input streams are defined > >>>>>(if > >>>>> envelope is from stream A, do X, if envelope is from stream B, do Y), > >>>>> but > >>>>> I'm open to suggestions if people have them. > >>>>> > >>>>> Regarding your JSON use case, yes, I think you'd be given the raw > >>>>>bytes > >>>>> for the key and message, and it'd be up to you to decide what to do > >>>>>with > >>>>> them. Samza allows you to define systems with no serde. Systems > >>>>>without > >>>>> a > >>>>> serde default to byte[] for both key and value, which means that you > >>>>> could > >>>>> define a system (or stream) with a pass-through serde, and send it > >>>>>the > >>>>> raw > >>>>> bytes for the key and message. Alternatively, you could try decoding > >>>>>the > >>>>> data using some other serde, posting the message, to a DB, logging > >>>>>the > >>>>> message in Log4j, discarding the message, etc. > >>>>> > >>>>> Cheers, > >>>>> Chris > >>>>> > >>>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]> > >>>>>wrote: > >>>>> > >>>>>> Hey Chris, > >>>>>> > >>>>>> I am not sure I understand your suggestion about the ErrorTask. > >>>>>>What > >>>>>> would > >>>>>> this new functions method signature be? I would assume it would > >>>>>>take > >>>>> in > >>>>>> the byte[] from the fromBytes function. It seems like that ties the > >>>>> Serde > >>>>>> implementation to the StreamTask implementation. Unless you are > >>>>>> suggesting > >>>>>> that it should be notified without giving the input byte[]. > >>>>>> > >>>>>> In our case we are using the JsonSerde, so the byte[] for the json > >>>>>>data > >>>>>> would be given to onDeserializationError and then our task would > >>>>>>have > >>>>> to > >>>>>> decode the bytes? > >>>>>> > >>>>>> Just to clarify my suggestion, I was not thinking that we would have > >>>>>>a > >>>>>> predefined set of behaviors. I was thinking that I would have an > >>>>>> interface > >>>>>> (That would not be part of the StreamTask), maybe ErrorHandler. > >>>>>>With > >>>>> this > >>>>>> option there would be implementations of that interface for Dropping > >>>>> the > >>>>>> message, redirect to a new Queue, or drop it. But this would > >>>>>>require > >>>>>> extra > >>>>>> configuration as you mentioned. > >>>>>> > >>>>>> I am not invested in my approach, I just wanted to make sure that I > >>>>>> understand the suggestions/options. > >>>>>> > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> > >>>>>> Danny > >>>>>> > >>>>>> > >>>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini > >>>>>> <[email protected]>wrote: > >>>>>> > >>>>>>> Hey Danny, > >>>>>>> > >>>>>>> I can think of two ways to accomplish this. > >>>>>>> > >>>>>>> The first way is essentially what you've described. Allow the > >>>>> framework > >>>>>>> to > >>>>>>> have a pre-defined set of options defined via config (drop the > >>>>> message, > >>>>>>> re-route to another topic, etc). > >>>>>>> > >>>>>>> The second way is to catch the serialization issues, and still pass > >>>>> the > >>>>>>> failed message to the StreamTask. The way in which the StreamTask > >>>>> would > >>>>>>> be > >>>>>>> notified of the failure is up for debate. One option would be to > >>>>> have a > >>>>>>> ErrorTask interface, which has an onDeserializationError call back. > >>>>> No > >>>>>>> new > >>>>>>> configuration would be required in this case--simply implementing > >>>>>>>the > >>>>>>> ErrorTask means you get notified of any serialization errors > >>>>>>>(rather > >>>>>>> than > >>>>>>> failing the container, which would be the default). Another option > >>>>> would > >>>>>>> be to have the IncomingMessageEnvelope have an error flag, which we > >>>>>>> could > >>>>>>> use to denote the serialization failure. > >>>>>>> > >>>>>>> I like the second approach because it's more generic. Instead of > >>>>>>> pre-defining exact behavior when a failure occurs, it seems more > >>>>>>> flexible > >>>>>>> to let the StreamTask know, and let the developer decide what the > >>>>>>> appropriate action is. I hadn't even thought of the re-route case > >>>>>>>you > >>>>>>> brought up, and I'm sure there are many other possible actions that > >>>>>>> we're > >>>>>>> not thinking of right now. Of the second approach's potential > >>>>>>> implementation options, I favor the ErrorTask approach right now, > >>>>> but I > >>>>>>> haven't dug into it too much. > >>>>>>> > >>>>>>> Regardless of which way we choose, I think the default should be to > >>>>> fail > >>>>>>> the container. This is the safest behavior, as it means there will > >>>>> be no > >>>>>>> data loss, and developers will be alerted of the serialization > >>>>>>>error. > >>>>>>> > >>>>>>> As far as implementation goes, The four classes that are probably > >>>>>>>of > >>>>>>> most > >>>>>>> interest to you are SamzaContainer, TaskInstance, StreamConsumers, > >>>>> and > >>>>>>> SerdeManager. You'll need to catch the serde exceptions somewhere > >>>>>>>in > >>>>> the > >>>>>>> StreamConsumer or SerdeManager class, and implement your new logic > >>>>>>> there. > >>>>>>> I think the best approach is to read over these four classes, and > >>>>> ask us > >>>>>>> any questions you might have. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Chris > >>>>>>> > >>>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> > >>>>> wrote: > >>>>>>> > >>>>>>>> We have discussed this, and it is something that we want to look > >>>>> into. > >>>>>>>> > >>>>>>>> Do you have any thoughts on how to implement this feature? > >>>>>>>> I assume you would want the failure behavior to be configurable. > >>>>>>>> > >>>>>>>> Like > >>>>>>>> Drop the message, > >>>>>>>> Send a message to a new queue, and drop. > >>>>>>>> Fail the container (is that ever appropriate?) > >>>>>>>> Anything else? > >>>>>>>> > >>>>>>>> I am not familiar with this code base. > >>>>>>>> Do you have a suggestion on what classes I should be looking to > >>>>> modify? > >>>>>>>> > >>>>>>>> Is there someone who I should bounce ideas off of? > >>>>>>>> > >>>>>>>> > >>>>>>>> Thanks > >>>>>>>> > >>>>>>>> > >>>>>>>> Danny > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> It's not intentional, error handling just hasn't been added yet. > >>>>> If > >>>>>>>>> you're > >>>>>>>>> interested, we'd love to have the contribution. In particular, > >>>>> take > >>>>>>> a > >>>>>>>>> look > >>>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), > >>>>> which > >>>>>>> also > >>>>>>>>> touches on how serdes should handle error conditions. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Jakob > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti > >>>>>>>>> <[email protected]>wrote: > >>>>>>>>> > >>>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes for > >>>>>>>>> serializing > >>>>>>>>>> kafka messages. > >>>>>>>>>> > >>>>>>>>>> I have noticed that if there is bad json input coming in through > >>>>>>>>> kafka, > >>>>>>>>> the > >>>>>>>>>> samza container seems to crash. > >>>>>>>>>> > >>>>>>>>>> I was looking at JsonSerde.scala, which does not seem to have > >>>>> any > >>>>>>>>> error > >>>>>>>>>> handling. > >>>>>>>>>> > >>>>>>>>>> So I was curious if this was intentional? > >>>>>>>>>> Or if there is a different way to handle these types of input > >>>>>>> errors? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thanks > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Danny > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >>>>> > >>> > >> > > > >
