AHeise commented on code in PR #107:
URL:
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1666693262
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -449,12 +460,17 @@ public void onCompletion(RecordMetadata metadata,
Exception exception) {
}
// Checking for exceptions from previous writes
- mailboxExecutor.submit(
+ // Notice: throwing exception in mailboxExecutor thread is not
safe enough for
+ // triggering global
+ // fail over, which has been fixed in [FLINK-31305]. And using
+ // mailboxExecutor.execute() is better than
+ // mailboxExecutor.submit() since submit will swallow
exceptions inside.
+ mailboxExecutor.execute(
Review Comment:
Yes, this is the actual fix. Using execute instead of submit. This bug got
introduced through FLINK-31305.
Note: `submit` does not swallow exceptions. It's an anti-pattern to use
`submit` without looking at the Future. The future holds the result/exception
to be handled async somewhere. If we bubble up the exception the task dies
without the call-site being able to properly react to it.
So please remove the second part of your comment from "And using ...".
I will amend the javadoc of the mailbox executor to make it clear that the
executor indeed does not bubble up exception in `submit`.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -411,11 +411,22 @@ private void registerMetricSync() {
* Logic needs to be invoked by write AND flush since we support various
semantics.
*/
private void checkAsyncException() throws IOException {
- // reset this exception since we could close the writer later on
Exception e = asyncProducerException;
if (e != null) {
-
- asyncProducerException = null;
+ // In old version, asyncProducerException will be set to null
here, which causes another
+ // bug [FLINK-35749]
+ // once asyncProducerException is set to null here, the
MailboxExecutor thread in
+ // WriterCallback.onCompletion may also invoke this method
checkAsyncException and get
+ // chance to make
+ // asyncProducerException be null before the function flush() and
write() seeing the
+ // exception.
+ // After that, the flush and writer functions can continue to
write next message batch
+ // to kafka, the old
+ // in-flight message batch with exceptions get lost.
+ // We hope that once some exceptions thrown during sending kafka,
KafkaWriter could be
+ // fast-failed and
+ // trigger global restarting, then asyncProducerException will be
initialized as null
+ // again.
Review Comment:
I don't think this change is necessary at all. You can see that we now have
inflated error count metrics in all the tests that you adjusted (and should be
reverted as well).
The issue is not that we reset the exception. It's that it doesn't bubble up
(see below). We actually have a clear contract on the volatile
`asyncProducerException`: it's set in the `onCompletion` and read+reset in the
main thread (which is the mailbox thread). So we never have a chance to
concurrently reset the exception and lose it somehow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]