[
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822161#comment-17822161
]
Ashish Khatkar edited comment on FLINK-34552 at 2/29/24 2:50 PM:
-----------------------------------------------------------------
Adding an example for this
consider two tables
Table A
Fields :
{A, B, C, D}
Table B
Fields :
{A, E, F, G}
Query : Select A, B, F from table A join table b on field A
consider a case where the join will contain 1B records for \{A, B, C, D, E, F,
G} but number of unique records for fields we are interested in are 1M \{A, B,
F}.
Now any change that happens in fields C, D, E, G is going to produce records
-U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but
effectively the records which we are interested in hasn’t changed.
was (Author: akhatkar):
Adding an example for this
consider two tables
Table A
Fields :
{A, B, C, D}
Table B
Fields :
{A, E, F, G}
Query : Select A, B, F from table A join table b on field A
consider a case where the join will contain 1B records for \{A, B, C, D, E, F,
G} but number of unique records for fields we are interested in are 1M \{A, B,
F}.
Now any change that happens in fields C, D, E, G is going to produce records
-U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but
effectively the records hasn’t changed.
> Support message deduplication for input data sources
> ----------------------------------------------------
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API, Table SQL / Runtime
> Reporter: Sergey Anokhovskiy
> Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of
> Flink Table API to be able to suppress duplicates from the input sources. It
> might be a parameter provided by the user if they want to suppress duplicates
> from the input source or not. Below I provided more details about my use case
> and available approaches.
>
> I have a Flink job which reads from two keyed kafka topics and emits messages
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>
> One of the input kafka topics produces messages with duplicate payload within
> PK in addition to meaningful data. That causes duplicates in the output topic
> and creates extra load to the downstream services.
>
> I was looking for a way to suppress duplicates and I found two strategies
> which doesn't seem to work for my use case:
> # Based on the deduplication window as a kafka[ sink
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
> for example. The Deduplication window doesn't work well for my case because
> the interval between duplicates is one day and I don't want my data to be
> delayed if I use such a big window.
>
> # Using
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
> . Unfortunately, this approach doesn't suit my use case either. Kafka topics
> a and b are CDC data streams and contain DELETE and REFRESH messages. If
> DELETE and REFRESH messages are coming with the same payload the job will
> suppress the last message which will lead to the incorrect output result. If
> I add message_type to the PARTITION key then the job will not be able to
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same
> payload and PK), because the last message will be suppressed which will lead
> to the incorrect output result.
>
> Finally, I had to create a separate custom Flink service which reads the
> output topic of the initial job and suppresses duplicates keeping hashes of
> the last processed message for each PK in the Flink state.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)