Alex Sorokoumov created FLINK-31620: ---------------------------------------
Summary: ReducingUpsertWriter does not flush the wrapped writer Key: FLINK-31620 URL: https://issues.apache.org/jira/browse/FLINK-31620 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Alex Sorokoumov According to {{SinkWriter#flush}} [javadoc|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java#L43-L47], the writer must flush its records to guarantee AT_LEAST_ONCE. {{upsert-kafka}}'s {{ReducingUpsertWriter}} inserts buffered records into the wrapped writer, but does not flush it: 1. SinkWriter#flush implementation - https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L88-L91. 2. The actual flush code - https://github.com/apache/flink/blob/f3c653ed2e4264315ed83a5b4b2494a7dcc41474/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L143-L150. -- This message was sent by Atlassian Jira (v8.20.10#820010)