[
https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552318#comment-17552318
]
Dmytro commented on FLINK-27963:
--------------------------------
[~martijnvisser], thank you for answering! The RecordTooLargeException is just
an example. The problem is that the Flink job stops processing all other
messages/data and gets stuck in a Restarting-Failed-Restarting loop after the
only one message is failed to be produced due to any runtime exception. For
example, one of our Flink jobs processes ~1k messages per minute through ~10
Flink operators (Functions, Mappers etc.) while their parallelism is set to
value from 32 to 134. If any runtime exception occurs then whole message
processing stops. It looks like a great candidate for DDOS attackers. It
doesn't look like the failover system, it requires some strategy to keep
processing data and manage any exception wherever they come from, otherwise
this is the bug. By the way, the Kafka server doesn't stop processing requests
after the RecordTooLargeException has happened, it is why I think this is an
issue on Flink's end.
> FlinkRuntimeException in KafkaSink causes a Flink job to hang
> -------------------------------------------------------------
>
> Key: FLINK-27963
> URL: https://issues.apache.org/jira/browse/FLINK-27963
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.0, 1.14.4
> Reporter: Dmytro
> Priority: Major
> Labels: FlinkRuntimeException, KafkaSink
>
> If FlinkRuntimeException occurs in the
> [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink]
> then the Flink job tries to re-send failed data again and gets into endless
> loop "exception->send again"
> *Code sample which throws the FlinkRuntimeException:*
> {code:java}
> int numberOfRows = 1;
> int rowsPerSecond = 1;
> DataStream<String> stream = environment.addSource(
> new DataGeneratorSource<>(
> RandomGenerator.stringGenerator(1050000), //
> max.message.bytes=1048588
> rowsPerSecond,
> (long) numberOfRows),
> TypeInformation.of(String.class))
> .setParallelism(1)
> .name("string-generator");
> KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
> .setBootstrapServers("localhost:9092")
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .setRecordSerializer(
>
> KafkaRecordSerializationSchema.builder().setTopic("test.output")
> .setValueSerializationSchema(new
> SimpleStringSchema())
> .build());
> KafkaSink<String> sink = builder.build();
> stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code}
> *Exception Stack Trace:*
> {code:java}
> 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO
> output-producer: Writer -> output-producer: Committer (1/1)
> (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on
> 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1).
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka
> null with FlinkKafkaInternalProducer{transactionalId='null',
> inTransaction=false, closed=false} at
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440)
> ~[flink-connector-kafka-1.15.0.jar:1.15.0] at
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421)
> ~[flink-connector-kafka-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> ~[flink-runtime-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> ~[flink-runtime-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> ~[flink-runtime-1.15.0.jar:1.15.0] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748)
> ~[?:1.8.0_292] Caused by:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 1050088 bytes when serialized which is larger than 1048576, which is the
> value of the max.request.size configuration. {code}
> **
--
This message was sent by Atlassian Jira
(v8.20.7#820007)