Hey Chris
This is my first stab at a diff for this.
I an new to scala and samza, and I am not clear on the best practices here.
I used an enumeration for the flag, but I was not sure if that is what you
wanted or just a boolean. I have a metric that is incremented, and support
Drop, Fail, and Log.
I could not find a good place to configure this flag, because there is
nothing else for this class that is configured from the property file (that
I could tell).
I was assuming that I wanted to add the flag to the constructor which is
called in SamzaContainer.scala?
I had to add an enqueue with a blank Envelope or I was getting the
following Exception.
So since I have to enqueue anyway (Or am I missing something)
Since I have to a new Envelope I am not sure what to put for the
Key/Message.
Exception in thread "main" java.util.NoSuchElementException: queue empty
at scala.collection.mutable.Queue.dequeue(Queue.scala:47)
at
org.apache.samza.system.SystemConsumers$$anonfun$refresh$3.apply(SystemConsumers.scala:240)
at
org.apache.samza.system.SystemConsumers$$anonfun$refresh$3.apply(SystemConsumers.scala:237)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
at
org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:237)
at
org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:226)
at
org.apache.samza.container.SamzaContainer.process(SamzaContainer.scala:556)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:457)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:78)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
On Wed, Feb 5, 2014 at 6:33 PM, Chris Riccomini <[email protected]>wrote:
> Hey Danny,
>
> Yep, pretty much.
>
> We should also have a counter metric to measure how many messages were
> dropped, and a config setting to toggle the dropping behavior.
>
> Logging is a little tricky. I'm kind of sensitive to spewing a whole bunch
> of "we just dropped your message from system stream partition X" log lines
> in the container logs. It's conceivable that folks might have a ton of bad
> messages, and logging every one would slow down the processing, and also
> fill the disk. Alternatively, it'd be really useful to know that messages
> had been dropped. The best I can come up with is to set it at DEBUG, so
> it's not verbose, but still there if people want it.
>
> Cheers,
> Chris
>
> On 2/5/14 6:13 PM, "Danny Antonetti" <[email protected]> wrote:
>
> >So it sounds like this would just be a try catch around this line
> >in SystemConsumers.scala
> >
> >unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManage
> >r.fromBytes(envelope))
> >
> >And just do some logging in the catch statement?
> >
> >
> >On Wed, Feb 5, 2014 at 12:32 PM, Chris Riccomini
> ><[email protected]>wrote:
> >
> >> Hey Guys,
> >>
> >> Thinking on this more. Given that we don't have a good grasp on this,
> >>and
> >> it seems fairly complicated for now, I'm proposing that we hold off on
> >>all
> >> this complex work, and just support the ability to drop messages for
> >>now.
> >> This should be pretty straightforward to implement, and is generally
> >> pretty useful, and not too invasive.
> >>
> >> Does that sound cool with folks? Danny?
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 2/5/14 11:43 AM, "Chris Riccomini" <[email protected]> wrote:
> >>
> >> >Hey Martin,
> >> >
> >> >I agree with you about ordering. It would be ideal to give the error()
> >> >callback exactly when we would normally call process() on the message,
> >>but
> >> >can't because of the serde error. I think this is do-able if we tweak
> >> >things in SystemConsumers a bit.
> >> >
> >> >The problem I'm having trouble resolving is how to handle the
> >> >MessageChooser. The MessageChooser chooses the processing order between
> >> >partitions ("I have a message from stream/partition X, and
> >> >stream/partition Y, which one do I process next?"). These choosers
> >> >occasionally look at the actual message payload (e.g. The timestamp
> >>field
> >> >of an incoming message) when doing the choosing. The chooser won't know
> >> >how to handle an IncomingMessageEnvelope where the key and value are
> >>both
> >> >byte[], when it's expecting Avro, ProtoBuf, JSON, etc. One solution I
> >>can
> >> >think of would be to add an error() call to the MessageChooser
> >>interface.
> >> >The other solution would be to just give the MessageChooser everything,
> >> >and in cases where it is looking at the message payload, it just has
> >>to be
> >> >careful. I think there might be other solutions here as well.
> >> >
> >> >More thought required.
> >> >
> >> >Cheers,
> >> >Chris
> >> >
> >> >On 2/4/14 4:13 PM, "Martin Kleppmann" <[email protected]> wrote:
> >> >
> >> >>Ok -- I had originally misunderstood your ErrorTask proposal. The way
> >>you
> >> >>described it looks good to me. I agree that a single error method,
> >>which
> >> >>takes a generic error envelope object, is better than lots of
> >>different
> >> >>methods.
> >> >>
> >> >>Rather than
> >> >>
> >> >>if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
> >> >>
> >> >>could we just say this?
> >> >>
> >> >>if (envelope.getError() instanceof DeserializationError)
> >> >>
> >> >>Regarding ordering: my instinctive reaction is that it would be least
> >> >>surprising if the error is received at the same point in the message
> >> >>sequence as the message would have been received, had it not been an
> >> >>error. But that may need some more thinking.
> >> >>
> >> >>Cheers,
> >> >>Martin
> >> >>
> >> >>On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]>
> >> wrote:
> >> >>> Hey Guys,
> >> >>>
> >> >>> One other thing to consider here, that didn't strike me at first is
> >>how
> >> >>>to
> >> >>> handle ordering when a serialization error occurs.
> >> >>>
> >> >>> Right now, the SystemConsumers class reads messages in, buffers
> >>them,
> >> >>>and
> >> >>> feeds them slowly to the MessageChooser. The MessageChooser takes
> >>only
> >> >>> IncomingMessageEnvelope right now. So the question is, if we have a
> >> >>>serde
> >> >>> error, when does the StreamTask see it? It shouldn't see it before
> >> >>>other
> >> >>> messages for the same stream partition, since this would mean we've
> >> >>>broken
> >> >>> the ordering of the messages. Arguably, the MessageChooser should
> >>also
> >> >>>get
> >> >>> a chance to see it, to choose in what order to process it.
> >> >>>
> >> >>> One could make the argument, though, that a message that can't be
> >> >>> deserialized is invalid, and you're looking at potential data loss
> >> >>>anyway,
> >> >>> so ordering doesn't matter. I'm not sure if this assumption is true
> >>in
> >> >>>all
> >> >>> cases though.
> >> >>>
> >> >>> I don't really have a good answer for this at the moment. I think we
> >> >>> should think about it.
> >> >>>
> >> >>> Cheers,
> >> >>> Chris
> >> >>>
> >> >>> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]>
> >> >>>wrote:
> >> >>>
> >> >>>> OK Great, that looks good.
> >> >>>>
> >> >>>> I will look into this as I have time and get back to you with
> >> >>>>questions.
> >> >>>>
> >> >>>>
> >> >>>> Thanks for the help.
> >> >>>>
> >> >>>>
> >> >>>> Danny
> >> >>>>
> >> >>>>
> >> >>>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini
> >> >>>> <[email protected]>wrote:
> >> >>>>
> >> >>>>> Hey Guys,
> >> >>>>>
> >> >>>>> Here's a pseudo-code proposal for an error API.
> >> >>>>>
> >> >>>>> interface ErrorTask {
> >> >>>>> public void error(ErrorEnvelope envelope, MessageCollector
> >> >>>>>collector,
> >> >>>>> TaskCoordinator coordinator);
> >> >>>>> }
> >> >>>>>
> >> >>>>> interface ErrorEnvelope {
> >> >>>>> ErrorCode getErrorCode();
> >> >>>>> Object getError();
> >> >>>>> }
> >> >>>>>
> >> >>>>> class DeserializationError {
> >> >>>>> byte[] getKey();
> >> >>>>>
> >> >>>>> byte[] getMessage();
> >> >>>>> boolean keyFailed();
> >> >>>>> boolean valueFailed();
> >> >>>>> }
> >> >>>>>
> >> >>>>> Then you could implement something like:
> >> >>>>>
> >> >>>>> class MyStreamTask implements StreamTask, ErrorTask {
> >> >>>>> public void proces(Š) {
> >> >>>>> // do stuff
> >> >>>>> }
> >> >>>>>
> >> >>>>> public void error(ErrorEnvelope envelope, MessageCollector
> >> >>>>>collector,
> >> >>>>> TaskCoordinator coordinator) {
> >> >>>>> if
> >> >>>>>(DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
> >> >>>>> {
> >> >>>>> DeserializationError error = (DeserializationError)
> >> >>>>> envelope.getError();
> >> >>>>> collector.send("kafka", "bad-data", error.getKey(),
> >> >>>>> error.getValue());
> >> >>>>> }
> >> >>>>> }
> >> >>>>> }
> >> >>>>>
> >> >>>>> Arguably, we've just moved our big if-statement block from the
> >> >>>>>process()
> >> >>>>> method into the ErrorTask.error method, but the alternative, as I
> >>see
> >> >>>>> it,
> >> >>>>> is to have on call back per error message. This moves us into a
> >>state
> >> >>>>> where every time we want to add a new callback, we break all
> >>existing
> >> >>>>> implementations (since they must now implement the new method, as
> >> >>>>> well). I
> >> >>>>> chose the single callback and generic object approach because it
> >> >>>>>matches
> >> >>>>> what our process method does when multiple input streams are
> >>defined
> >> >>>>>(if
> >> >>>>> envelope is from stream A, do X, if envelope is from stream B, do
> >>Y),
> >> >>>>> but
> >> >>>>> I'm open to suggestions if people have them.
> >> >>>>>
> >> >>>>> Regarding your JSON use case, yes, I think you'd be given the raw
> >> >>>>>bytes
> >> >>>>> for the key and message, and it'd be up to you to decide what to
> >>do
> >> >>>>>with
> >> >>>>> them. Samza allows you to define systems with no serde. Systems
> >> >>>>>without
> >> >>>>> a
> >> >>>>> serde default to byte[] for both key and value, which means that
> >>you
> >> >>>>> could
> >> >>>>> define a system (or stream) with a pass-through serde, and send it
> >> >>>>>the
> >> >>>>> raw
> >> >>>>> bytes for the key and message. Alternatively, you could try
> >>decoding
> >> >>>>>the
> >> >>>>> data using some other serde, posting the message, to a DB, logging
> >> >>>>>the
> >> >>>>> message in Log4j, discarding the message, etc.
> >> >>>>>
> >> >>>>> Cheers,
> >> >>>>> Chris
> >> >>>>>
> >> >>>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]>
> >> >>>>>wrote:
> >> >>>>>
> >> >>>>>> Hey Chris,
> >> >>>>>>
> >> >>>>>> I am not sure I understand your suggestion about the ErrorTask.
> >> >>>>>>What
> >> >>>>>> would
> >> >>>>>> this new functions method signature be? I would assume it would
> >> >>>>>>take
> >> >>>>> in
> >> >>>>>> the byte[] from the fromBytes function. It seems like that ties
> >>the
> >> >>>>> Serde
> >> >>>>>> implementation to the StreamTask implementation. Unless you are
> >> >>>>>> suggesting
> >> >>>>>> that it should be notified without giving the input byte[].
> >> >>>>>>
> >> >>>>>> In our case we are using the JsonSerde, so the byte[] for the
> >>json
> >> >>>>>>data
> >> >>>>>> would be given to onDeserializationError and then our task would
> >> >>>>>>have
> >> >>>>> to
> >> >>>>>> decode the bytes?
> >> >>>>>>
> >> >>>>>> Just to clarify my suggestion, I was not thinking that we would
> >>have
> >> >>>>>>a
> >> >>>>>> predefined set of behaviors. I was thinking that I would have an
> >> >>>>>> interface
> >> >>>>>> (That would not be part of the StreamTask), maybe ErrorHandler.
> >> >>>>>>With
> >> >>>>> this
> >> >>>>>> option there would be implementations of that interface for
> >>Dropping
> >> >>>>> the
> >> >>>>>> message, redirect to a new Queue, or drop it. But this would
> >> >>>>>>require
> >> >>>>>> extra
> >> >>>>>> configuration as you mentioned.
> >> >>>>>>
> >> >>>>>> I am not invested in my approach, I just wanted to make sure
> >>that I
> >> >>>>>> understand the suggestions/options.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> Thanks
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> Danny
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
> >> >>>>>> <[email protected]>wrote:
> >> >>>>>>
> >> >>>>>>> Hey Danny,
> >> >>>>>>>
> >> >>>>>>> I can think of two ways to accomplish this.
> >> >>>>>>>
> >> >>>>>>> The first way is essentially what you've described. Allow the
> >> >>>>> framework
> >> >>>>>>> to
> >> >>>>>>> have a pre-defined set of options defined via config (drop the
> >> >>>>> message,
> >> >>>>>>> re-route to another topic, etc).
> >> >>>>>>>
> >> >>>>>>> The second way is to catch the serialization issues, and still
> >>pass
> >> >>>>> the
> >> >>>>>>> failed message to the StreamTask. The way in which the
> >>StreamTask
> >> >>>>> would
> >> >>>>>>> be
> >> >>>>>>> notified of the failure is up for debate. One option would be to
> >> >>>>> have a
> >> >>>>>>> ErrorTask interface, which has an onDeserializationError call
> >>back.
> >> >>>>> No
> >> >>>>>>> new
> >> >>>>>>> configuration would be required in this case--simply
> >>implementing
> >> >>>>>>>the
> >> >>>>>>> ErrorTask means you get notified of any serialization errors
> >> >>>>>>>(rather
> >> >>>>>>> than
> >> >>>>>>> failing the container, which would be the default). Another
> >>option
> >> >>>>> would
> >> >>>>>>> be to have the IncomingMessageEnvelope have an error flag,
> >>which we
> >> >>>>>>> could
> >> >>>>>>> use to denote the serialization failure.
> >> >>>>>>>
> >> >>>>>>> I like the second approach because it's more generic. Instead of
> >> >>>>>>> pre-defining exact behavior when a failure occurs, it seems more
> >> >>>>>>> flexible
> >> >>>>>>> to let the StreamTask know, and let the developer decide what
> >>the
> >> >>>>>>> appropriate action is. I hadn't even thought of the re-route
> >>case
> >> >>>>>>>you
> >> >>>>>>> brought up, and I'm sure there are many other possible actions
> >>that
> >> >>>>>>> we're
> >> >>>>>>> not thinking of right now. Of the second approach's potential
> >> >>>>>>> implementation options, I favor the ErrorTask approach right
> >>now,
> >> >>>>> but I
> >> >>>>>>> haven't dug into it too much.
> >> >>>>>>>
> >> >>>>>>> Regardless of which way we choose, I think the default should
> >>be to
> >> >>>>> fail
> >> >>>>>>> the container. This is the safest behavior, as it means there
> >>will
> >> >>>>> be no
> >> >>>>>>> data loss, and developers will be alerted of the serialization
> >> >>>>>>>error.
> >> >>>>>>>
> >> >>>>>>> As far as implementation goes, The four classes that are
> >>probably
> >> >>>>>>>of
> >> >>>>>>> most
> >> >>>>>>> interest to you are SamzaContainer, TaskInstance,
> >>StreamConsumers,
> >> >>>>> and
> >> >>>>>>> SerdeManager. You'll need to catch the serde exceptions
> >>somewhere
> >> >>>>>>>in
> >> >>>>> the
> >> >>>>>>> StreamConsumer or SerdeManager class, and implement your new
> >>logic
> >> >>>>>>> there.
> >> >>>>>>> I think the best approach is to read over these four classes,
> >>and
> >> >>>>> ask us
> >> >>>>>>> any questions you might have.
> >> >>>>>>>
> >> >>>>>>> Cheers,
> >> >>>>>>> Chris
> >> >>>>>>>
> >> >>>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]
> >
> >> >>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> We have discussed this, and it is something that we want to
> >>look
> >> >>>>> into.
> >> >>>>>>>>
> >> >>>>>>>> Do you have any thoughts on how to implement this feature?
> >> >>>>>>>> I assume you would want the failure behavior to be
> >>configurable.
> >> >>>>>>>>
> >> >>>>>>>> Like
> >> >>>>>>>> Drop the message,
> >> >>>>>>>> Send a message to a new queue, and drop.
> >> >>>>>>>> Fail the container (is that ever appropriate?)
> >> >>>>>>>> Anything else?
> >> >>>>>>>>
> >> >>>>>>>> I am not familiar with this code base.
> >> >>>>>>>> Do you have a suggestion on what classes I should be looking to
> >> >>>>> modify?
> >> >>>>>>>>
> >> >>>>>>>> Is there someone who I should bounce ideas off of?
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Thanks
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Danny
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan
> >><[email protected]>
> >> >>>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> It's not intentional, error handling just hasn't been added
> >>yet.
> >> >>>>> If
> >> >>>>>>>>> you're
> >> >>>>>>>>> interested, we'd love to have the contribution. In
> >>particular,
> >> >>>>> take
> >> >>>>>>> a
> >> >>>>>>>>> look
> >> >>>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59),
> >> >>>>> which
> >> >>>>>>> also
> >> >>>>>>>>> touches on how serdes should handle error conditions.
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks,
> >> >>>>>>>>> Jakob
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
> >> >>>>>>>>> <[email protected]>wrote:
> >> >>>>>>>>>
> >> >>>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes
> >>for
> >> >>>>>>>>> serializing
> >> >>>>>>>>>> kafka messages.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I have noticed that if there is bad json input coming in
> >>through
> >> >>>>>>>>> kafka,
> >> >>>>>>>>> the
> >> >>>>>>>>>> samza container seems to crash.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I was looking at JsonSerde.scala, which does not seem to have
> >> >>>>> any
> >> >>>>>>>>> error
> >> >>>>>>>>>> handling.
> >> >>>>>>>>>>
> >> >>>>>>>>>> So I was curious if this was intentional?
> >> >>>>>>>>>> Or if there is a different way to handle these types of input
> >> >>>>>>> errors?
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Danny
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>
> >> >>
> >> >
> >>
> >>
>
>
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index cdba7fe..94520a0 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -79,6 +79,22 @@ class SystemConsumers(
noNewMessagesTimeout: Long = 10) extends Logging {
/**
+ * Specifies the behavior when a serialization error is encountered
+ */
+ object SerializationErrorBehavior extends Enumeration {
+ type SerializationErrorBehavior = Value
+ val
+ // drop the message, but do not fail the container
+ Drop,
+
+ // log the message but nothing else
+ Log,
+
+ // fail the entire container, this should be the default
+ Fail = Value
+ }
+
+ /**
* A buffer of incoming messages grouped by SystemStreamPartition.
*/
var unprocessedMessages = Map[SystemStreamPartition,
Queue[IncomingMessageEnvelope]]()
@@ -135,6 +151,17 @@ class SystemConsumers(
*/
val depletedQueueSizeThreshold = (maxMsgsPerStreamPartition * (1 -
fetchThresholdPct)).toInt
+ /**
+ * Used to configure the behavior when an exception is thrown from the
deserializer
+ */
+ val serializationErrorBehavior = SerializationErrorBehavior.Log
+
+ /**
+ * A counter used to keep track of the number of serialization exceptions
+ * that have been encountered
+ */
+ val serializationErrorCounter = metrics.newCounter("serializationError")
+
debug("Got stream consumers: %s" format consumers)
debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
debug("Got no new message timeout: %s" format noNewMessagesTimeout)
@@ -255,8 +282,25 @@ class SystemConsumers(
debug("Updated fetch map for: %s, %s" format (systemStreamPartition,
fetchMap))
-
unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope))
-
+ try {
+
unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope))
+ } catch {
+ case ex: Exception =>
+ serializationErrorCounter.inc(1)
+ serializationErrorBehavior match {
+ case SerializationErrorBehavior.Log => debug("serialization error
: %s" format ex)
+ case SerializationErrorBehavior.Drop =>
+ case default => throw ex
+ }
+
+ // we need to add a message here or we will get an "queue empty"
exception later on
+ unprocessedMessages(envelope.getSystemStreamPartition).enqueue(new
IncomingMessageEnvelope(
+ envelope.getSystemStreamPartition,
+ envelope.getOffset,
+ null,
+ null))
+ }
+
debug("Updated unprocessed messages for: %s, %s" format
(systemStreamPartition, unprocessedMessages))
})
}