Sergey Anokhovskiy created FLINK-34552:
------------------------------------------
Summary: Support message deduplication for input data sources
Key: FLINK-34552
URL: https://issues.apache.org/jira/browse/FLINK-34552
Project: Flink
Issue Type: New Feature
Components: Table SQL / API, Table SQL / Runtime
Reporter: Sergey Anokhovskiy
My main proposal is: To have duplicate message suppression logic as a part of
flink table api to be able to suppress duplicates from the input sources. It
might be a parameter provided by user if they want to suppress duplicates from
the input source or not. Below I provided more details about my use case and
available approaches.
I have a flink job which reads from two keyed kafka topics and emits messages
to the keyed kafka topic. The flink job executes the join query:
SELECT a.id, adata, bdata
FROM a
JOIN b
ON a.id = b.id
One of the input kafka topics produces messages with duplicate payload within
PK in additional to meaningful data. That causes duplicates in the output topic
and creates extra load to the downstream services.
I was looking for a way to suppress duplicates and I found two strategies which
doesn't seem to work for my use case:
1. Based on deduplication window as kafka sink buffer for example
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46
Deduplication window doesn't work well for my case because the interval between
duplicates is one day and I don't want my data to be delayed if use such a big
window.
2. Using ROW_NUMBER
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/
. Unfortunately, this approach doesn't suit my use case too. Kafka topics a
and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE
and REFRESH messages are comming with the same payload the job will suppress
the last message which will lead to the incorrect output result. If I add
message_type to the PARTITION key then the job will not be able to process
messages sequences like this: DELETE->REFRESH->DELETE (with the same payload
and PK), because the last message will be suppressed which will lead to the
incorrect output result.
Finally, I had to create a separate custom flink service which reads the output
topic of the initial job and suppresses duplicates keeping message hashes in
the state. The initial join job, described above, still has to process
duplicates. Would it better to be able to suppress duplicates from the input
sources?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)