tzulitai commented on code in PR #22303:
URL: https://github.com/apache/flink/pull/22303#discussion_r1152666515


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -420,6 +420,7 @@ private void checkAsyncException() throws IOException {
         if (e != null) {
 
             asyncProducerException = null;
+            numRecordsOutErrorsCounter.inc();

Review Comment:
   Actually, this looks like an incorrect redundant increment. It seems like it 
is already being incremented once in the mailbox executor (which I think is the 
right place to do the increment, not here).
   
   I believe the `testErrorPropagationAndErrorsCounter` is not catching this 
incorrect dual increment because in that test you're bypassing the callbacks' 
`onCompletion` logic to directly inject the async exception. This is probably a 
hint that the `@VisibleForTesting setAsyncException()` method is a bad way for 
test injection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to