[ 
https://issues.apache.org/jira/browse/FLINK-23025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368555#comment-17368555
 ] 

Jark Wu edited comment on FLINK-23025 at 6/24/21, 9:23 AM:
-----------------------------------------------------------

Fixed in 
 - master: 6defc99cabb733a4480f2664dbf10a1bb3cdc4e3
 - release-1.13: 28e1f7de58aa534efb533b5de42d2a78f4c4dd96


was (Author: jark):
Fixed in 
 - master: TODO
 - release-1.13: 28e1f7de58aa534efb533b5de42d2a78f4c4dd96

> sink-buffer-max-rows and sink-buffer-flush-interval options produce a lot of 
> duplicates
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-23025
>                 URL: https://issues.apache.org/jira/browse/FLINK-23025
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>    Affects Versions: 1.13.1
>            Reporter: Johannes Moser
>            Assignee: Shengkai Fang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.14.0, 1.13.2
>
>
> Using the 
> [sink-buffer-flush-max-rows|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#sink-buffer-flush-interval]
>  and 
> [sink-buffer-flush-interval|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#sink-buffer-flush-interval]
>  options for a kafka sink produces a lot of duplicate key/values in the 
> target kafka topic. Maybe the {{BufferedUpsertSinkFunction}} should clone the 
> buffered key/value RowData objects, but it doesn’t. Seems like in [line 
> 134|https://github.com/apache/flink/blob/60c7d9e77a6e9d82e0feb33f0d8bc263dddf2fd9/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunction.java#L133-L137]
>  the condition should be negated or the ternary operator results swapped:
> {code:java}
> this.valueCopier =
>  getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()
>  ? Function.identity()
>  : typeSerializer::copy;{code}
> (in the jdbc sink the same logic is done but the ternary operator results 
> swapped)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to