Hi everyone, I recently came across the following exception when dealing with a job failure, which uses the Flink as its sink.
``` org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has passed since batch creation ``` After I dug into the source code of FlinkKafkaProducer, I found out that FlinkKafkaProducer does not have any kind of backpressure mechanism if I am correct. Incoming records are simply sent using KafkaProducer#send without synchronization (at FlinkKafkaProducer.java#L915 <https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915>). If the parallelism of the producer is not correctly set according to its upstream throughput or write to the leader of a topic partition performs badly, the accumulator in KafkaProducer will be full of unsent records and finally causes record expiration as the one above. I have seen there was a similar ticket FLINK-9083 <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for the Cassandra connector. Shall we have the same improvement for the Kafka connect? Maybe we can also have maxConcurrentRequests attribute in FlinkKafkaProducer and use a semaphore to limit requests? Thanks, Wenhao