RamanVerma commented on code in PR #22150:
URL: https://github.com/apache/flink/pull/22150#discussion_r1136403399


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -397,36 +409,57 @@ private void registerMetricSync() {
                 });
     }
 
+    /** This logic needs to be invoked by write AND flush since we support 
various semantics. */
+    private void checkAsyncException() throws IOException {
+        // reset this exception since we could close the writer later on
+        Exception e = asyncProducerException.getAndUpdate(ignored -> null);
+        if (e != null) {
+            throw new IOException(
+                    "One or more Kafka Producer send requests have encountered 
exception", e);
+        }
+    }
+
     private class WriterCallback implements Callback {
         private final MailboxExecutor mailboxExecutor;
         @Nullable private final Consumer<RecordMetadata> metadataConsumer;
 
+        private final AtomicReference<Exception> asyncException;
+
         public WriterCallback(
                 MailboxExecutor mailboxExecutor,
-                @Nullable Consumer<RecordMetadata> metadataConsumer) {
+                @Nullable Consumer<RecordMetadata> metadataConsumer,
+                AtomicReference<Exception> asyncException) {
             this.mailboxExecutor = mailboxExecutor;
             this.metadataConsumer = metadataConsumer;
+            this.asyncException = asyncException;
         }
 
         @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) 
{
             if (exception != null) {
                 FlinkKafkaInternalProducer<byte[], byte[]> producer =
                         KafkaWriter.this.currentProducer;
-                mailboxExecutor.execute(
-                        () -> {
-                            numRecordsOutErrorsCounter.inc();
-                            throwException(metadata, exception, producer);
-                        },
-                        "Failed to send data to Kafka");
+
+                // Propagate the first exception since amount of exceptions 
could be large. Need to
+                // do this in Producer IO thread since flush() guarantees that 
the future will
+                // complete. The same guarantee does not hold for tasks 
executed in separate
+                // executor e.g. mailbox executor. flush() needs to have the 
exception immediately
+                // available to fail the checkpoint.
+                asyncException.compareAndSet(
+                        null, decorateException(metadata, exception, 
producer));
+
+                // Need to send metrics through mailbox thread since we are in 
the producer io
+                // thread
+                mailboxExecutor.submit(
+                        () -> numRecordsOutErrorsCounter.inc(), "Update error 
metric");

Review Comment:
   Do we need to throw the exception in mailboxExecutor thread also, as was the 
case earlier.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -142,7 +143,8 @@
         this.deliveryCallback =
                 new WriterCallback(
                         sinkInitContext.getMailboxExecutor(),
-                        
sinkInitContext.<RecordMetadata>metadataConsumer().orElse(null));
+                        
sinkInitContext.<RecordMetadata>metadataConsumer().orElse(null),

Review Comment:
   Line 155 checks for sinkInitContext to not be null. It should be moved 
before this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to