AHeise commented on code in PR #107:
URL:
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1669917010
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -411,11 +411,22 @@ private void registerMetricSync() {
* Logic needs to be invoked by write AND flush since we support various
semantics.
*/
private void checkAsyncException() throws IOException {
- // reset this exception since we could close the writer later on
Exception e = asyncProducerException;
if (e != null) {
-
- asyncProducerException = null;
+ // In old version, asyncProducerException will be set to null
here, which causes another
+ // bug [FLINK-35749]
+ // once asyncProducerException is set to null here, the
MailboxExecutor thread in
+ // WriterCallback.onCompletion may also invoke this method
checkAsyncException and get
+ // chance to make
+ // asyncProducerException be null before the function flush() and
write() seeing the
+ // exception.
+ // After that, the flush and writer functions can continue to
write next message batch
+ // to kafka, the old
+ // in-flight message batch with exceptions get lost.
+ // We hope that once some exceptions thrown during sending kafka,
KafkaWriter could be
+ // fast-failed and
+ // trigger global restarting, then asyncProducerException will be
initialized as null
+ // again.
Review Comment:
It's a good point to raise if we need to reset at all or not. I'm assuming
@mas-chen did it to emulate the previous behavior.
From what I can see, the only real downside is that we rethrow the exception
in close (which is always called also in case of errors). That could prevent
some proper cleanup and will count exceptions twice in most cases, which will
inflate the metrics. I think the latter point has quite a bit of side-effects:
I'm expecting most customers to have some alerts on the exceptions metrics,
which could mean that it's a behavior change that would alert more users than
in previous versions. So in the worst case, some poor data engineer will be
paged in the middle of the night because of that. And I would expect a few
alerts to be changed to be less noisy.
All in all, I think that resetting the exception would be good IF we are all
sure that there won't be any exception missed. Being sure that we can't have
corrupted state is of course more important than having accurate metrics but
ideally we have both.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]