[ 
https://issues.apache.org/jira/browse/FLINK-34414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822063#comment-17822063
 ] 

Rafał Trójczak commented on FLINK-34414:
----------------------------------------

Hello All,

I would like to kindly remind you about this issue. With this problem (if in 
fact this is a problem) the EXACTLY_ONCE delivery guarantee doesn't work for 
Flink-Pulsar setup and makes this technology stack useless in many applications.

> EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector 
> ------------------------------------------------------------------------
>
>                 Key: FLINK-34414
>                 URL: https://issues.apache.org/jira/browse/FLINK-34414
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.17.2
>            Reporter: Rafał Trójczak
>            Priority: Major
>
> Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version 
> 1.17.2) when there is an exception thrown within the job, the job gets 
> restarted, starts from the last checkpoint, but the sink writes to the output 
> more events than it should, even though the EXACT_ONCE guarantees are set 
> everywhere. To be more specific, here is my Job's flow:
>  * a Pulsar source that reads from the input topic,
>  * a simple processing function,
>  * and a sink that writes to the output topic.
> Here is a fragment of the source creation:
> {code:java}
>     .setDeserializationSchema(Schema.AVRO(inClass), inClass)
>     .setSubscriptionName(subscription)
>     .enableSchemaEvolution()
>     .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
>     .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)
>     .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1)
>     .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, 
> false);
> {code}
> Here is the fragment of the sink creation:
> {code:java}
>     .setSerializationSchema(Schema.AVRO(outClass), outClass)
>     .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
>     .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, 
> DeliveryGuarantee.EXACTLY_ONCE)
>     .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
> {code}
> And here is the Flink environment preparation:
> {code:java}
>     environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>     environment.enableCheckpointing(CHECKPOINTING_INTERVAL, 
> CheckpointingMode.EXACTLY_ONCE);
> {code}
> After sending 1000 events on the input topic, on the output topic I got 1048 
> events.
> I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator.
> Here is the MRE for this problem (mind that there is an internal dependency, 
> but it may be commented out together with the code that relies on it): 
> [https://github.com/trojczak/flink-pulsar-connector-problem]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to