mas-chen commented on code in PR #22150:
URL: https://github.com/apache/flink/pull/22150#discussion_r1137413489
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -397,36 +409,57 @@ private void registerMetricSync() {
});
}
+ /** This logic needs to be invoked by write AND flush since we support
various semantics. */
+ private void checkAsyncException() throws IOException {
+ // reset this exception since we could close the writer later on
+ Exception e = asyncProducerException.getAndUpdate(ignored -> null);
+ if (e != null) {
+ throw new IOException(
+ "One or more Kafka Producer send requests have encountered
exception", e);
+ }
+ }
+
private class WriterCallback implements Callback {
private final MailboxExecutor mailboxExecutor;
@Nullable private final Consumer<RecordMetadata> metadataConsumer;
+ private final AtomicReference<Exception> asyncException;
+
public WriterCallback(
MailboxExecutor mailboxExecutor,
- @Nullable Consumer<RecordMetadata> metadataConsumer) {
+ @Nullable Consumer<RecordMetadata> metadataConsumer,
+ AtomicReference<Exception> asyncException) {
this.mailboxExecutor = mailboxExecutor;
this.metadataConsumer = metadataConsumer;
+ this.asyncException = asyncException;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
{
if (exception != null) {
FlinkKafkaInternalProducer<byte[], byte[]> producer =
KafkaWriter.this.currentProducer;
- mailboxExecutor.execute(
- () -> {
- numRecordsOutErrorsCounter.inc();
- throwException(metadata, exception, producer);
- },
- "Failed to send data to Kafka");
+
+ // Propagate the first exception since amount of exceptions
could be large. Need to
+ // do this in Producer IO thread since flush() guarantees that
the future will
+ // complete. The same guarantee does not hold for tasks
executed in separate
+ // executor e.g. mailbox executor. flush() needs to have the
exception immediately
+ // available to fail the checkpoint.
+ asyncException.compareAndSet(
+ null, decorateException(metadata, exception,
producer));
+
+ // Need to send metrics through mailbox thread since we are in
the producer io
+ // thread
+ mailboxExecutor.submit(
+ () -> numRecordsOutErrorsCounter.inc(), "Update error
metric");
Review Comment:
Good catch, I guess there could be a condition where neither write nor flush
are invoked for a while with NONE semantics
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]