[ 
https://issues.apache.org/jira/browse/FLINK-30681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676791#comment-17676791
 ] 

Yufan Sheng edited comment on FLINK-30681 at 1/13/23 11:34 PM:
---------------------------------------------------------------

I have checked your code and understand what you have meet. You have set the 
delivery guarantee to {{exactly-once}} in [your 
code|https://github.com/JacekWislicki/vp-test5/blob/0f4a66100c138945435e2690da8912b14bf0354f/flink/src/main/java/com/example/test5/flink/job/BaseJob.java#L57].
 This would commit the messages in a Pulsar's Transaction. The Pulsar sink will 
commit the transaction when the Flink job has finished the checkpoint and start 
a new Transaction. So the sinked messages in your code won't be consumed until 
the Flink job enabled the checkpoint.

You can set the {{DeliveryGuarantee}} to {{AT_LEAST_ONCE}} to see if it fixes 
this case. And please keep this issue open. I'll add an explain in the 
[connector 
document|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#delivery-guarantee].


was (Author: syhily):
I have checked your code and understand what you have meet. You have set the 
delivery guarantee to {{exactly-once}} in [your 
code|https://github.com/JacekWislicki/vp-test5/blob/0f4a66100c138945435e2690da8912b14bf0354f/flink/src/main/java/com/example/test5/flink/job/BaseJob.java#L57].
 This would commit the messages in a Pulsar's Transaction. The Pulsar sink will 
commit the transaction when the Flink job has finished the checkpoint and start 
a new Transaction. So the sinked messages in your code won't be consumed until 
the Flink job enabled the checkpoint.

You can set the {{DeliveryGuarantee}} to {{AT_LEAST_ONCE}} to see if it fixes 
this case. And please keep this issue open. I'll add a explain in the 
[connector 
document|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#delivery-guarantee].

> Pulsar-Flink connector corrupts its output topic
> ------------------------------------------------
>
>                 Key: FLINK-30681
>                 URL: https://issues.apache.org/jira/browse/FLINK-30681
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.15.3
>            Reporter: Jacek Wislicki
>            Priority: Major
>
> When PulsarSink writes a message to its output topic, the topic gets 
> permanently corrupted and cannot be used anymore (even with newly created 
> subscriptions).
> We have isolated this behaviour to a minimal project demonstrating the 
> problem available on [GitHub|https://github.com/JacekWislicki/vp-test5]:
> # There are 2 topics: IN and OUT
> # IN is subscribed by a Flink's InToOutJob (with PulsarSource) and writes to 
> OUT (with PulsarSink)
> # OUT is subscribed by a Pulsar's OutReadFunction
> # When we write directly to OUT (e.g., with OutTopicProducer), 
> OutReadFunction gets each message from its backlog and processes it with no 
> issue (the ledger position updates)
> # When we write to IN (e.g., with InTopicProducer), InToOutJob reads the 
> message, processes it and writes to OUT
> # OutReadFunction reads the message, the ledger position updates, but nothing 
> happens
> ## Further messages written to OUT are not read as OUT is blocked on the last 
> message from Flink
> ## Truncating OUT does not help, neither does unsubscribing or creating a new 
> subscription
> Reproduced with Pulsar 2.9.1, 2.9.2 and 2.10.2.
> The issue does not occur when we use our custom temporary old SinkFunction 
> implementation based on a Pulsar producer writing to OUT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to