RamanVerma commented on code in PR #22150:
URL: https://github.com/apache/flink/pull/22150#discussion_r1136403399
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -397,36 +409,57 @@ private void registerMetricSync() {
});
}
+ /** This logic needs to be invoked by write AND flush since we support
various semantics. */
+ private void checkAsyncException() throws IOException {
+ // reset this exception since we could close the writer later on
+ Exception e = asyncProducerException.getAndUpdate(ignored -> null);
+ if (e != null) {
+ throw new IOException(
+ "One or more Kafka Producer send requests have encountered
exception", e);
+ }
+ }
+
private class WriterCallback implements Callback {
private final MailboxExecutor mailboxExecutor;
@Nullable private final Consumer<RecordMetadata> metadataConsumer;
+ private final AtomicReference<Exception> asyncException;
+
public WriterCallback(
MailboxExecutor mailboxExecutor,
- @Nullable Consumer<RecordMetadata> metadataConsumer) {
+ @Nullable Consumer<RecordMetadata> metadataConsumer,
+ AtomicReference<Exception> asyncException) {
this.mailboxExecutor = mailboxExecutor;
this.metadataConsumer = metadataConsumer;
+ this.asyncException = asyncException;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
{
if (exception != null) {
FlinkKafkaInternalProducer<byte[], byte[]> producer =
KafkaWriter.this.currentProducer;
- mailboxExecutor.execute(
- () -> {
- numRecordsOutErrorsCounter.inc();
- throwException(metadata, exception, producer);
- },
- "Failed to send data to Kafka");
+
+ // Propagate the first exception since amount of exceptions
could be large. Need to
+ // do this in Producer IO thread since flush() guarantees that
the future will
+ // complete. The same guarantee does not hold for tasks
executed in separate
+ // executor e.g. mailbox executor. flush() needs to have the
exception immediately
+ // available to fail the checkpoint.
+ asyncException.compareAndSet(
+ null, decorateException(metadata, exception,
producer));
+
+ // Need to send metrics through mailbox thread since we are in
the producer io
+ // thread
+ mailboxExecutor.submit(
+ () -> numRecordsOutErrorsCounter.inc(), "Update error
metric");
Review Comment:
Do we need to throw the exception in mailboxExecutor thread also, as was the
case earlier.
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -142,7 +143,8 @@
this.deliveryCallback =
new WriterCallback(
sinkInitContext.getMailboxExecutor(),
-
sinkInitContext.<RecordMetadata>metadataConsumer().orElse(null));
+
sinkInitContext.<RecordMetadata>metadataConsumer().orElse(null),
Review Comment:
Line 155 checks for sinkInitContext to not be null. It should be moved
before this line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]