mas-chen commented on code in PR #22303:
URL: https://github.com/apache/flink/pull/22303#discussion_r1153914893


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -198,39 +197,135 @@ public void testCurrentSendTimeMetric() throws Exception 
{
     }
 
     @Test
-    void testNumRecordsOutErrorsCounterMetric() throws Exception {
+    void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception {
         Properties properties = getKafkaClientConfiguration();
-        final InternalSinkWriterMetricGroup metricGroup =
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
 
-        try (final KafkaWriter<Integer> writer =
+        SinkInitContext sinkInitContext =
+                new SinkInitContext(
+                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        timeService,
+                        null);
+        final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
-                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
metricGroup)) {
-            final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
-            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
+        final Counter numRecordsOutErrors =
+                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+
+        triggerProducerException(writer, properties);
+
+        // test flush
+        assertThatCode(() -> writer.flush(false))
+                .hasRootCauseExactlyInstanceOf(ProducerFencedException.class);
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+
+        assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                .as("the exception is not thrown again")
+                .doesNotThrowAnyException();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+    }
 
-            writer.write(1, SINK_WRITER_CONTEXT);
-            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+    @Test
+    void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
 
-            final String transactionalId = 
writer.getCurrentProducer().getTransactionalId();
+        SinkInitContext sinkInitContext =
+                new SinkInitContext(
+                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        timeService,
+                        null);
+        final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
+        final Counter numRecordsOutErrors =
+                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+
+        triggerProducerException(writer, properties);
+        // to ensure that the exceptional send request has completed
+        writer.getCurrentProducer().flush();
+
+        assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                .hasRootCauseExactlyInstanceOf(ProducerFencedException.class);
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+
+        assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                .as("the exception is not thrown again")
+                .doesNotThrowAnyException();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+    }
 
-            try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                    new FlinkKafkaInternalProducer<>(properties, 
transactionalId)) {
+    @Test
+    void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
 
-                producer.initTransactions();
-                producer.beginTransaction();
-                producer.send(new ProducerRecord<byte[], byte[]>(topic, 
"2".getBytes()));
-                producer.commitTransaction();
-            }
+        SinkInitContext sinkInitContext =
+                new SinkInitContext(
+                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        timeService,
+                        null);
+        final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
+        final Counter numRecordsOutErrors =
+                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
-            writer.write(3, SINK_WRITER_CONTEXT);
-            // this doesn't throw exception because the exception is thrown in 
the Producer IO
-            // thread in unit tests due to the mock mailbox executor, while it 
would be thrown in
-            // flush() when the real mailbox executor is configured
-            writer.flush(false);
-            writer.prepareCommit();
-            assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+        triggerProducerException(writer, properties);
+        // to ensure that the exceptional send request has completed
+        writer.getCurrentProducer().flush();
+
+        while (sinkInitContext.getMailboxExecutor().tryYield()) {

Review Comment:
   I don't assert the exception here because for the TestSinkInitContext, the 
exception is captured by a different thread, so the unit test never encounters 
it. I think verifying the counter increment is good enough for this purpose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to