[
https://issues.apache.org/jira/browse/FLINK-34414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822063#comment-17822063
]
Rafał Trójczak commented on FLINK-34414:
----------------------------------------
Hello All,
I would like to kindly remind you about this issue. With this problem (if in
fact this is a problem) the EXACTLY_ONCE delivery guarantee doesn't work for
Flink-Pulsar setup and makes this technology stack useless in many applications.
> EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector
> ------------------------------------------------------------------------
>
> Key: FLINK-34414
> URL: https://issues.apache.org/jira/browse/FLINK-34414
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Affects Versions: 1.17.2
> Reporter: Rafał Trójczak
> Priority: Major
>
> Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version
> 1.17.2) when there is an exception thrown within the job, the job gets
> restarted, starts from the last checkpoint, but the sink writes to the output
> more events than it should, even though the EXACT_ONCE guarantees are set
> everywhere. To be more specific, here is my Job's flow:
> * a Pulsar source that reads from the input topic,
> * a simple processing function,
> * and a sink that writes to the output topic.
> Here is a fragment of the source creation:
> {code:java}
> .setDeserializationSchema(Schema.AVRO(inClass), inClass)
> .setSubscriptionName(subscription)
> .enableSchemaEvolution()
> .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
> .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)
> .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1)
> .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE,
> false);
> {code}
> Here is the fragment of the sink creation:
> {code:java}
> .setSerializationSchema(Schema.AVRO(outClass), outClass)
> .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
> .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE,
> DeliveryGuarantee.EXACTLY_ONCE)
> .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
> {code}
> And here is the Flink environment preparation:
> {code:java}
> environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> environment.enableCheckpointing(CHECKPOINTING_INTERVAL,
> CheckpointingMode.EXACTLY_ONCE);
> {code}
> After sending 1000 events on the input topic, on the output topic I got 1048
> events.
> I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator.
> Here is the MRE for this problem (mind that there is an internal dependency,
> but it may be commented out together with the code that relies on it):
> [https://github.com/trojczak/flink-pulsar-connector-problem]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)