JimmyZZZ commented on code in PR #107:
URL: 
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1666991488


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -411,11 +411,22 @@ private void registerMetricSync() {
      * Logic needs to be invoked by write AND flush since we support various 
semantics.
      */
     private void checkAsyncException() throws IOException {
-        // reset this exception since we could close the writer later on
         Exception e = asyncProducerException;
         if (e != null) {
-
-            asyncProducerException = null;
+            // In old version, asyncProducerException will be set to null 
here, which causes another
+            // bug [FLINK-35749]
+            // once asyncProducerException is set to null here, the 
MailboxExecutor thread in
+            // WriterCallback.onCompletion may also invoke this method 
checkAsyncException and get
+            // chance to make
+            // asyncProducerException be null before the function flush() and 
write() seeing the
+            // exception.
+            // After that, the flush and writer functions can continue to 
write next message batch
+            // to kafka, the old
+            // in-flight message batch with exceptions get lost.
+            // We hope that once some exceptions thrown during sending kafka, 
KafkaWriter could be
+            // fast-failed and
+            // trigger global restarting, then asyncProducerException will be 
initialized as null
+            // again.

Review Comment:
   As you mentioned in another comment: " If we bubble up the exception the 
task dies without the call-site being able to properly react to it."
   
   I'm a little confused: 
   If we hope to bubble up the exception only in main thread with 
mailboxExecutor.execute, the problem of 
[FLINK-31305](https://issues.apache.org/jira/browse/FLINK-31305) will occur 
again: 
   in-flight checkpoint will be completed, once the mailboxExecutor.execute 
throw the exception a little late, then the job restarts and recovers from this 
error checkpoint
   
   Throwing exception depending on mailboxExecutor.execute is not safe enough 
since mailboxExecutor thread and checkpoint thread are different. So I agree 
with the fix of FLINK-31305 bubbling up the exception in 1.writer() 2. flush() 
3. close(), which can  trigger global fail over ASAP. But in FLINK-31305, the 
exception may be reset by mailboxExecutor.execute, 1/2/3 can not see the 
exception and the in-flight checkpoint with kafka sending problems may be 
completed. That's  why I remove the logic of resetting to null.
   
   If there is any mistake in my understanding, please point it out to me, 
Thanks.  @AHeise 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to