[
https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552401#comment-17552401
]
Dmytro edited comment on FLINK-27963 at 6/9/22 7:53 PM:
--------------------------------------------------------
[~martijnvisser], I have a couple of ideas how any exception which comes from
Kafka side could be managed properly by developers who use Flink Kafka
Connectors in their projects.
*The ideas:*
1. To allow to overwrite the
[KafkaWriter::deliveryCallback|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L90]
field (which is private final field in the 1.15.0 and 1.14.4 versions) by a
custom delivery callback, then the custom callback may suppress an
[ApiException|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java],
if it is required by a project architecture
2. To call the "this.interceptors.onSendError(record, tp, e);" in all catches
in the
[KafkaProducer::doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]
method (more details are bellow) and do not call the
"[callback.onCompletion(null,
e);|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1059]"
at all, then a developer can manage all exceptions in a custom [producer
interceptor|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java]
*Some details:*
To produce/sink a message there is the following call stack
(flink-connector-kafka v1.15.0)
-> KafkaSink
-> KafkaWriter
creates a delivery callback (private final Callback deliveryCallback) and
passes it to the doSend method in the KafkaProducer below
-> KafkaProducer
the doSend method calls KafkaWriter::deliveryCallback::onCompletion(null,
exception) if an ApiException occurs
{code:java}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback
callback) {
// .......
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called
before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}{code}
-> KafkaWriter::onCompletion throws an exception if the exception input
parameter is not null
{code:java}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
FlinkKafkaInternalProducer<byte[], byte[]> producer =
KafkaWriter.this.currentProducer;
mailboxExecutor.execute(
() -> {
numRecordsOutErrorsCounter.inc();
numRecordsSendErrorsCounter.inc();
throwException(metadata, exception, producer);
},
"Failed to send data to Kafka");
}
if (metadataConsumer != null) {
metadataConsumer.accept(metadata);
}
} {code}
was (Author: JIRAUSER290675):
[~martijnvisser], I have a couple of ideas how any exception which comes from
Kafka side could be managed properly by developers who use Flink Kafka
Connectors in their projects.
*The ideas:*
1. To allow to overwrite the KafkaWriter::deliveryCallback field (which is
private final field in the 1.15.0 and 1.14.4 versions) by a custom delivery
callback, then it might suppress an ApiException, if it is required by a
project architecture
2. To call the "this.interceptors.onSendError(record, tp, e);" in all catches
in the KafkaProducer::doSend method (more details are bellow) and do not call
the "callback.onCompletion(null, e);" at all, then a developer can manage all
exceptions in a custom producer interceptor
*Some details:*
To produce/sink a message there is the following call stack
(flink-connector-kafka v1.15.0)
-> KafkaSink
-> KafkaWriter
creates a delivery callback (private final Callback deliveryCallback) and
passes it to the doSend method in the KafkaProducer below
-> KafkaProducer
the doSend method calls KafkaWriter::deliveryCallback::onCompletion(null,
exception) if an ApiException occurs
{code:java}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback
callback) {
// .......
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called
before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}{code}
-> KafkaWriter::onCompletion throws an exception if the exception input
parameter is not null
{code:java}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
FlinkKafkaInternalProducer<byte[], byte[]> producer =
KafkaWriter.this.currentProducer;
mailboxExecutor.execute(
() -> {
numRecordsOutErrorsCounter.inc();
numRecordsSendErrorsCounter.inc();
throwException(metadata, exception, producer);
},
"Failed to send data to Kafka");
}
if (metadataConsumer != null) {
metadataConsumer.accept(metadata);
}
} {code}
> FlinkRuntimeException in KafkaSink causes a Flink job to hang
> -------------------------------------------------------------
>
> Key: FLINK-27963
> URL: https://issues.apache.org/jira/browse/FLINK-27963
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.0, 1.14.4
> Reporter: Dmytro
> Priority: Major
> Labels: FlinkRuntimeException, KafkaSink
>
> If FlinkRuntimeException occurs in the
> [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink]
> then the Flink job tries to re-send failed data again and gets into endless
> loop "exception->send again"
> *Code sample which throws the FlinkRuntimeException:*
> {code:java}
> int numberOfRows = 1;
> int rowsPerSecond = 1;
> DataStream<String> stream = environment.addSource(
> new DataGeneratorSource<>(
> RandomGenerator.stringGenerator(1050000), //
> max.message.bytes=1048588
> rowsPerSecond,
> (long) numberOfRows),
> TypeInformation.of(String.class))
> .setParallelism(1)
> .name("string-generator");
> KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
> .setBootstrapServers("localhost:9092")
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .setRecordSerializer(
>
> KafkaRecordSerializationSchema.builder().setTopic("test.output")
> .setValueSerializationSchema(new
> SimpleStringSchema())
> .build());
> KafkaSink<String> sink = builder.build();
> stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code}
> *Exception Stack Trace:*
> {code:java}
> 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO
> output-producer: Writer -> output-producer: Committer (1/1)
> (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on
> 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1).
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka
> null with FlinkKafkaInternalProducer{transactionalId='null',
> inTransaction=false, closed=false} at
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440)
> ~[flink-connector-kafka-1.15.0.jar:1.15.0] at
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421)
> ~[flink-connector-kafka-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> ~[flink-runtime-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> ~[flink-runtime-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> ~[flink-runtime-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748)
> ~[?:1.8.0_292] Caused by:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 1050088 bytes when serialized which is larger than 1048576, which is the
> value of the max.request.size configuration. {code}
> **
--
This message was sent by Atlassian Jira
(v8.20.7#820007)