Alex Sorokoumov created FLINK-31620:
---------------------------------------

             Summary: ReducingUpsertWriter does not flush the wrapped writer
                 Key: FLINK-31620
                 URL: https://issues.apache.org/jira/browse/FLINK-31620
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
            Reporter: Alex Sorokoumov


According to {{SinkWriter#flush}} 
[javadoc|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java#L43-L47],
 the writer must flush its records to guarantee AT_LEAST_ONCE.

{{upsert-kafka}}'s {{ReducingUpsertWriter}} inserts buffered records into the 
wrapped writer, but does not flush it:
1. SinkWriter#flush implementation - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L88-L91.
2. The actual flush code - 
https://github.com/apache/flink/blob/f3c653ed2e4264315ed83a5b4b2494a7dcc41474/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L143-L150.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to