[
https://issues.apache.org/jira/browse/FLINK-34414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837079#comment-17837079
]
Yufan Sheng commented on FLINK-34414:
-------------------------------------
I don't think this is a issue from my side. First of all
{{.setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)}} only works on the
PulsarSink side. The source didn't use transaction for its poor performance, we
have to drop the support of transaction in source. In your sample code, you
have set {{.setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)}}
and {{.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE,
false)}}. This means the duplicated message could be get because the Pulsar
can't the acknowledege from connector and resend the messages. So I think this
is the root cause.
> EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector
> ------------------------------------------------------------------------
>
> Key: FLINK-34414
> URL: https://issues.apache.org/jira/browse/FLINK-34414
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Affects Versions: 1.17.2
> Reporter: Rafał Trójczak
> Priority: Major
>
> Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version
> 1.17.2) when there is an exception thrown within the job, the job gets
> restarted, starts from the last checkpoint, but the sink writes to the output
> more events than it should, even though the EXACT_ONCE guarantees are set
> everywhere. To be more specific, here is my Job's flow:
> * a Pulsar source that reads from the input topic,
> * a simple processing function,
> * and a sink that writes to the output topic.
> Here is a fragment of the source creation:
> {code:java}
> .setDeserializationSchema(Schema.AVRO(inClass), inClass)
> .setSubscriptionName(subscription)
> .enableSchemaEvolution()
> .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
> .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)
> .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1)
> .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE,
> false);
> {code}
> Here is the fragment of the sink creation:
> {code:java}
> .setSerializationSchema(Schema.AVRO(outClass), outClass)
> .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
> .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE,
> DeliveryGuarantee.EXACTLY_ONCE)
> .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
> {code}
> And here is the Flink environment preparation:
> {code:java}
> environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> environment.enableCheckpointing(CHECKPOINTING_INTERVAL,
> CheckpointingMode.EXACTLY_ONCE);
> {code}
> After sending 1000 events on the input topic, on the output topic I got 1048
> events.
> I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator.
> Here is the MRE for this problem (mind that there is an internal dependency,
> but it may be commented out together with the code that relies on it):
> [https://github.com/trojczak/flink-pulsar-connector-problem]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)