[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Anokhovskiy updated FLINK-34552:
---------------------------------------
    Description: 
My main proposal is to have duplicate message suppression logic as a part of 
Flink Table API to be able to suppress duplicates from the input sources. It 
might be a parameter provided by the user if they want to suppress duplicates 
from the input source or not. Below I provided more details about my use case 
and available approaches.

 

I have a Flink job which reads from two keyed kafka topics and emits messages 
to the keyed kafka topic. The Flink job executes the join query:

SELECT a.id, adata, bdata

FROM a

JOIN b

ON a.id = b.id

 

One of the input kafka topics produces messages with duplicate payload within 
PK in addition to meaningful data. That causes duplicates in the output topic 
and creates extra load to the downstream services.

 

I was looking for a way to suppress duplicates and I found two strategies which 
doesn't seem to work for my use case:
 #  Based on the deduplication window as a kafka[ sink 
buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
 for example. The Deduplication window doesn't work well for my case because 
the interval between duplicates is one day and I don't want my data to be 
delayed if I use such a big window.

 
 #  Using 
[ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
 . Unfortunately, this approach doesn't suit my use case either. Kafka topics a 
and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE 
and REFRESH messages are coming with the same payload the job will suppress the 
last message which will lead to the incorrect output result. If I add 
message_type to the PARTITION key then the job will not be able to process 
messages sequences like this: DELETE->REFRESH->DELETE (with the same payload 
and PK), because the last message will be suppressed which will lead to the 
incorrect output result.

 

Finally, I had to create a separate custom Flink service which reads the output 
topic of the initial job and suppresses duplicates keeping hashes of the last 
processed message for each PK in the Flink state.

  was:
My main proposal is: To have duplicate message suppression logic as a part of 
flink table api to be able to suppress duplicates from the input sources. It 
might be a parameter provided by user if they want to suppress duplicates from 
the input source or not. Below I provided more details about my use case and 
available approaches.

I have a flink job which reads from two keyed kafka topics and emits messages 
to the keyed kafka topic. The flink job executes the join query:
SELECT a.id, adata, bdata
FROM a
JOIN b
ON a.id = b.id

One of the input kafka topics produces messages with duplicate payload within 
PK in additional to meaningful data. That causes duplicates in the output topic 
and creates extra load to the downstream services.

I was looking for a way to suppress duplicates and I found two strategies which 
doesn't seem to work for my use case:
1. Based on deduplication window as kafka sink buffer for example 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46
Deduplication window doesn't work well for my case because the interval between 
duplicates is one day and I don't want my data to be delayed if use such a big 
window.

2. Using ROW_NUMBER 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/
 . Unfortunately, this approach doesn't suit my use case too. Kafka topics a 
and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE 
and REFRESH messages are comming with the same payload the job will suppress 
the last message which will lead to the incorrect output result. If I add 
message_type to the PARTITION key then the job will not be able to process 
messages sequences like this: DELETE->REFRESH->DELETE (with the same payload 
and PK), because the last message will be suppressed which will lead to the 
incorrect output result.

Finally, I had to create a separate custom flink service which reads the output 
topic of the initial job and suppresses duplicates keeping message hashes in 
the state. The initial join job, described above, still has to process 
duplicates. Would it better to be able to suppress duplicates from the input 
sources?

 


> Support message deduplication for input data sources
> ----------------------------------------------------
>
>                 Key: FLINK-34552
>                 URL: https://issues.apache.org/jira/browse/FLINK-34552
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API, Table SQL / Runtime
>            Reporter: Sergey Anokhovskiy
>            Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to