Hi,
I have checked the KIP-399 and the discussion and also KIP-210.
So the question we need to answer is whether it's okay to also skip
writing the record in the internal topics, the current implementation of
'ProductionExceptionHandler' is applied for all topics and if we decided
to keep it that way, how to ensure that there will be no divergence in
local store and changelog topic ?
I would like to get input from others on what they think about this.
There is a point that I don't understand which is: why in case of a
serialization error do we have the choice of either skipping it or
putting in the store ? shouldn't the record be correctly serialized
before putting it into the store ?
On 13/12/2018 14:13, Matthias J. Sax wrote:
For store updates, records are first serialized and afterwards put into
the store and written into the changelog topic.
In the current implementation, if the send() into the changelog topic
produces an error and the handler skips over it, the local store content
and the changelog topic diverge. This seems to be a correctness issue.
For serialization error, it would not happen that store and changelog
diverge, because serialization happens before and put/send. Thus, with
this KIP we could skip both put() and send(). However, I am still
wondering, if it would be ok to skip a store update for this case? (Btw:
the current PR does not address this atm, and a serialization error for
a store write would not be covered but kill the instance).
IIRC, the original idea of the KIP was to allow skipping over record for
output topics only. That's why I am wondering if it's ok to allow
skipper over record in repartitions topics, too.
In the end, it's some data loss for all 3 cases, so maybe it's ok to
allow skipping for all 3 cases. However, we should not allow that local
store and changelog topic diverge IMHO (what might been an orthogonal
bug thought).
I also don't have an answer or preference. Just think, it's important to
touch on those cases and get input how people think about it.
-Matthias
On 12/11/18 11:43 AM, Kamal Chandraprakash wrote:
Matthias,
For changelog topics, I think it does not make sense to allow skipping
records if serialization fails? For internal repartitions topics, I am
not sure if we should allow it or not. Would you agree with this? We
should discuss the implication to derive a sound design.
Can you explain the issue that happens when records are skipped to
changelog / internal-repartition topics ? So, that I can look into it.
On Fri, Dec 7, 2018 at 12:07 AM Matthias J. Sax <matth...@confluent.io>
wrote:
To accept different types of records from multiple topologies, I have to
define the ProducerRecord without generics.
Yes. It does make sense. My point was, that the KIP should
mention/explain this explicitly to allow other not familiar with the
code base to understand it more easily :)
About `ClassCastException`: seems to be an implementation detail. No
need to make it part of the KIP discussion.
One more thing that came to my mind. We use the `RecordCollector` to
write into all topics, ie, user output topics and internal repartition
and changelog topics.
For changelog topics, I think it does not make sense to allow skipping
records if serialization fails? For internal repartitions topics, I am
not sure if we should allow it or not. Would you agree with this? We
should discuss the implication to derive a sound design.
I was also just double checking the code, and it seems that the current
`ProductionExceptionHandler` is applied for all topics. This seems to be
incorrect to me. Seems we missed this case when doing KIP-210? (Or did
we discuss this and I cannot remember? Might be worth to double check.)
Last thought: of course, the handler will know which topic is affected
and can provide a corresponding implementation. Was just wondering if we
should be more strict?
-Matthias
On 12/6/18 10:01 AM, Kamal Chandraprakash wrote:
Matt,
I agree with Matthias on not to altering the serializer as it's used
by
multiple components.
Matthias,
- the proposed method accepts a `ProducerRecord` -- it might be good to
explain why this cannot be done in a type safe way (ie, missing generics)
To accept different types of records from multiple topologies, I have to
define the ProducerRecord without generics.
- `AlwaysProductionExceptionHandler` ->
`AlwaysContinueProductionExceptionHandler`
Updated the typo error in KIP.
- `DefaultProductionExceptionHandler` is not mentioned
The `handleSerializationException` method in the
`ProductionExceptionHandler` interface will have default implementation
that is set to FAIL by default.
This is done to avoid any changes in the user implementation. So, I
didn't
mentioned the `DefaultProductionExceptionHandler` class. Updated the KIP.
- Why do you distinguish between `ClassCastException` and "any other
unchecked exception? Both second case seems to include the first one?
In SinkNode.java#93
<
https://github.com/apache/kafka/blob/87cc31c4e7ea36e7e832a1d02d71480a91a75293/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93
on
hitting `ClassCastException`, we are halting the streams as it's a fatal
error.
To keep the original behavior, I've to distinguish the exceptions.
On Thu, Dec 6, 2018 at 10:44 PM Matthias J. Sax <matth...@confluent.io>
wrote:
Well, that's exactly the point. The serializer should not be altered
IMHO because this would have impact on other components. Also, for
applications that use KafkaProducer directly, they can catch any
serialization exception and react to it. Hence, I don't don't see a
reason to change the serializer interface.
Instead, it seems better to solve this issue in Streams by allowing to
skip over a record for this case.
Some more comments on the KIP:
- the proposed method accepts a `ProducerRecord` -- it might be good to
explain why this cannot be done in a type safe way (ie, missing
generics)
- `AlwaysProductionExceptionHandler` ->
`AlwaysContinueProductionExceptionHandler`
- `DefaultProductionExceptionHandler` is not mentioned
- Why do you distinguish between `ClassCastException` and "any other
unchecked exception? Both second case seems to include the first one?
-Matthias
On 12/6/18 8:35 AM, Matt Farmer wrote:
Ah, good point.
Should we consider altering the serializer interface to permit not
sending
the record?
On Wed, Dec 5, 2018 at 9:23 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:
Matt,
That's a good point. If these cases are handled in the serializer,
then
one cannot continue the stream processing by skipping the record.
To continue, you may have to send a empty record serialized key/value
(new
byte[0]) to the downstream on hitting the error which may cause
un-intended
results.
On Wed, Dec 5, 2018 at 8:41 PM Matt Farmer <m...@frmr.me> wrote:
Hi there,
Thanks for this KIP.
What’s the thinking behind doing this in ProductionExceptionHandler
versus
handling these cases in your serializer implementation?
On Mon, Dec 3, 2018 at 1:09 AM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:
Hello dev,
I hope to initiate the discussion for KIP-399: Extend
ProductionExceptionHandler to cover serialization exceptions.
KIP: <
https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions
JIRA: https://issues.apache.org/jira/browse/KAFKA-7499
All feedbacks will be highly appreciated.
Thanks,
Kamal Chandraprakash