Thanks Piotr for your reply! It is a nice solution! By restricting the buffer using these properties, I think maxConcurrentRequests attribute is indeed not necessary anymore.
On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <piotr.nowoj...@gmail.com> wrote: > Hi Wenhao, > > As far as I know this is different compared to FLINK-9083, as KafkaProducer > itself can back pressure writes if internal buffers are exhausted [1]. > > > The buffer.memory controls the total amount of memory available to the > producer for buffering. If records are sent faster than they can be > transmitted to the server then this buffer space will be exhausted. When > the buffer space is exhausted additional send calls will block. The > threshold for time to block is determined by max.block.ms after which it > throws a TimeoutException. > > If you want to limit the amount of concurrent requests you can do it via > reducing the `buffer.memory` option passed to KafkaProducer (via > FlinkKafkaProducer's "Properties producerConfig"). > > Piotrek > > [1] > > https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > > pt., 23 kwi 2021 o 11:15 Wenhao Ji <predator....@gmail.com> napisaĆ(a): > > > Hi everyone, > > > > I recently came across the following exception when dealing with a job > > failure, which uses the Flink as its sink. > > > > ``` > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed > to > > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has > > passed since batch creation > > ``` > > > > After I dug into the source code of FlinkKafkaProducer, I found out that > > FlinkKafkaProducer does not have any kind of backpressure mechanism if I > am > > correct. Incoming records are simply sent using KafkaProducer#send > without > > synchronization (at FlinkKafkaProducer.java#L915 > > < > > > https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915 > > >). > > If the parallelism of the producer is not correctly set according to its > > upstream throughput or write to the leader of a topic partition performs > > badly, the accumulator in KafkaProducer will be full of unsent records > and > > finally causes record expiration as the one above. > > > > I have seen there was a similar ticket FLINK-9083 > > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for > > the Cassandra connector. Shall we have the same improvement for the Kafka > > connect? Maybe we can also have maxConcurrentRequests attribute in > > FlinkKafkaProducer and use a semaphore to limit requests? > > > > Thanks, > > Wenhao > > >