tzulitai commented on code in PR #22303:
URL: https://github.com/apache/flink/pull/22303#discussion_r1152666515


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -420,6 +420,7 @@ private void checkAsyncException() throws IOException {
         if (e != null) {
 
             asyncProducerException = null;
+            numRecordsOutErrorsCounter.inc();

Review Comment:
   Actually, this looks like an incorrect redundant increment. It seems like it 
is already being incremented once in the mailbox executor (which I think is the 
right place to do the increment, not here).
   
   I believe the `testErrorPropagationAndErrorsCounter` is not catching this 
incorrect dual increment because in that test you're bypassing the callbacks to 
directly inject the async exception. This is probably a hint that the 
`@VisibleForTesting setAsyncException()` method is a bad test injection.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -420,6 +420,7 @@ private void checkAsyncException() throws IOException {
         if (e != null) {
 
             asyncProducerException = null;
+            numRecordsOutErrorsCounter.inc();

Review Comment:
   On that matter, for leaner test cases additionally I think we should:
   
   1. Break up `testErrorPropagationAndErrorsCounter` test into 2 separate 
tests that individually test error propagation and the error counter, 
separately.
   2. consider removing the `testNumRecordsOutErrorsCounterMetric` test as it 
seems to have overlapping coverage with your new test (or improve that one). 
The `testNumRecordsOutErrorsCounterMetric` is really bloated, though, as it's 
using an awkward way just to trigger an async exception through the callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to