tzulitai commented on code in PR #22150:
URL: https://github.com/apache/flink/pull/22150#discussion_r1149672884


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -397,6 +413,20 @@ private void registerMetricSync() {
                 });
     }
 
+    /** This logic needs to be invoked by write AND flush since we support 
various semantics. */
+    private void checkAsyncException() throws IOException {
+        // reset this exception since we could close the writer later on
+        if (asyncProducerException != null) {
+            try {
+                throw new IOException(
+                        "One or more Kafka Producer send requests have 
encountered exception",
+                        asyncProducerException);
+            } finally {
+                asyncProducerException = null;
+            }

Review Comment:
   ```suggestion
   Exception e = asyncProducerException;
   if (e != null) {
     asyncProducerException = null;
     throw new IOException(
       "One or more Kafka Producer send requests have encountered exception",
       e);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to