[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822161#comment-17822161
 ] 

Ashish Khatkar edited comment on FLINK-34552 at 2/29/24 2:50 PM:
-----------------------------------------------------------------

Adding an example for this

consider two tables
Table A
Fields :

{A, B, C, D}

Table B
Fields :

{A, E, F, G}

Query : Select A, B, F from table A join table b on field A

consider a case where the join will contain 1B records for \{A, B, C, D, E, F, 
G} but number of unique records for fields we are interested in are 1M \{A, B, 
F}.

Now any change that happens in fields C, D, E, G is going to produce records 
-U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but 
effectively the records which we are interested in hasn’t changed.


was (Author: akhatkar):
Adding an example for this

consider two tables
Table A
Fields :

{A, B, C, D}

Table B
Fields :

{A, E, F, G}

Query : Select A, B, F from table A join table b on field A

consider a case where the join will contain 1B records for \{A, B, C, D, E, F, 
G} but number of unique records for fields we are interested in are 1M \{A, B, 
F}.

Now any change that happens in fields C, D, E, G is going to produce records 
-U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but 
effectively the records hasn’t changed.

> Support message deduplication for input data sources
> ----------------------------------------------------
>
>                 Key: FLINK-34552
>                 URL: https://issues.apache.org/jira/browse/FLINK-34552
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API, Table SQL / Runtime
>            Reporter: Sergey Anokhovskiy
>            Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to