Thanks Piotr for your reply!
It is a nice solution! By restricting the buffer using these properties, I
think maxConcurrentRequests attribute is indeed not necessary anymore.

On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi Wenhao,
>
> As far as I know this is different compared to FLINK-9083, as KafkaProducer
> itself can back pressure writes if internal buffers are exhausted [1].
>
> > The buffer.memory controls the total amount of memory available to the
> producer for buffering. If records are sent faster than they can be
> transmitted to the server then this buffer space will be exhausted. When
> the buffer space is exhausted additional send calls will block. The
> threshold for time to block is determined by max.block.ms after which it
> throws a TimeoutException.
>
> If you want to limit the amount of concurrent requests you can do it via
> reducing the `buffer.memory` option passed to KafkaProducer (via
> FlinkKafkaProducer's "Properties producerConfig").
>
> Piotrek
>
> [1]
>
> https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>
> pt., 23 kwi 2021 o 11:15 Wenhao Ji <predator....@gmail.com> napisaƂ(a):
>
> > Hi everyone,
> >
> > I recently came across the following exception when dealing with a job
> > failure, which uses the Flink as its sink.
> >
> > ```
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> to
> > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has
> > passed since batch creation
> > ```
> >
> > After I dug into the source code of FlinkKafkaProducer, I found out that
> > FlinkKafkaProducer does not have any kind of backpressure mechanism if I
> am
> > correct. Incoming records are simply sent using KafkaProducer#send
> without
> > synchronization (at FlinkKafkaProducer.java#L915
> > <
> >
> https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915
> > >).
> > If the parallelism of the producer is not correctly set according to its
> > upstream throughput or write to the leader of a topic partition performs
> > badly, the accumulator in KafkaProducer will be full of unsent records
> and
> > finally causes record expiration as the one above.
> >
> > I have seen there was a similar ticket FLINK-9083
> > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for
> > the Cassandra connector. Shall we have the same improvement for the Kafka
> > connect? Maybe we can also have maxConcurrentRequests attribute in
> > FlinkKafkaProducer and use a semaphore to limit requests?
> >
> > Thanks,
> > Wenhao
> >
>

Reply via email to