JimmyZZZ commented on code in PR #107:
URL:
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1666964753
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -411,11 +411,22 @@ private void registerMetricSync() {
* Logic needs to be invoked by write AND flush since we support various
semantics.
*/
private void checkAsyncException() throws IOException {
- // reset this exception since we could close the writer later on
Exception e = asyncProducerException;
if (e != null) {
-
- asyncProducerException = null;
+ // In old version, asyncProducerException will be set to null
here, which causes another
+ // bug [FLINK-35749]
+ // once asyncProducerException is set to null here, the
MailboxExecutor thread in
+ // WriterCallback.onCompletion may also invoke this method
checkAsyncException and get
+ // chance to make
+ // asyncProducerException be null before the function flush() and
write() seeing the
+ // exception.
+ // After that, the flush and writer functions can continue to
write next message batch
+ // to kafka, the old
+ // in-flight message batch with exceptions get lost.
+ // We hope that once some exceptions thrown during sending kafka,
KafkaWriter could be
+ // fast-failed and
+ // trigger global restarting, then asyncProducerException will be
initialized as null
+ // again.
Review Comment:
Thanks for explaining, and I want to discuss this part with you for more
detail.
As we can see, the read-reset-throw logic is in checkAsyncException(), and
there 4 parent method may invoke checkAsyncException():
1. writer() 2. flush() 3. close() 4. WriterCallback#onCompletion
I agree with "we never have a chance to concurrently reset the exception ",
but 1/2/3 have the chance to reset the exception before 4, since
WriterCallback#onCompletion is in asynchronous threads and assigning exception
to asyncProducerException is not in the mailboxExecutor, which will expose the
exception to 1/2/3 before mailboxExecutor.execute.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]