[
https://issues.apache.org/jira/browse/FLINK-30681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676791#comment-17676791
]
Yufan Sheng edited comment on FLINK-30681 at 1/13/23 11:42 PM:
---------------------------------------------------------------
I have checked your code and understand what you have meet. You have set the
delivery guarantee to {{exactly-once}} in [your
code|https://github.com/JacekWislicki/vp-test5/blob/0f4a66100c138945435e2690da8912b14bf0354f/flink/src/main/java/com/example/test5/flink/job/BaseJob.java#L57].
This would commit the messages in a Pulsar's Transaction. The Pulsar sink
won't commit the transaction until the Flink job has finished the checkpoint
and Pulsar will start a new Transaction after committing it. So the sinked
messages in your code won't be consumed until the Flink job enabled the
checkpoint.
Any new sinked messages without any transaction won't be consumed in the mean
time. Because there are pending Transactions.
You can set the {{DeliveryGuarantee}} to {{AT_LEAST_ONCE}} to see if it fixes
this case. And please keep this issue open. I'll add an explain in the
[connector
document|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#delivery-guarantee].
was (Author: syhily):
I have checked your code and understand what you have meet. You have set the
delivery guarantee to {{exactly-once}} in [your
code|https://github.com/JacekWislicki/vp-test5/blob/0f4a66100c138945435e2690da8912b14bf0354f/flink/src/main/java/com/example/test5/flink/job/BaseJob.java#L57].
This would commit the messages in a Pulsar's Transaction. The Pulsar sink will
commit the transaction until the Flink job has finished the checkpoint and
Pulsar will start a new Transaction after committing it. So the sinked messages
in your code won't be consumed until the Flink job enabled the checkpoint.
Any new sinked messages without any transaction won't be consumed in the mean
time. Because there are pending Transactions.
You can set the {{DeliveryGuarantee}} to {{AT_LEAST_ONCE}} to see if it fixes
this case. And please keep this issue open. I'll add an explain in the
[connector
document|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#delivery-guarantee].
> Pulsar-Flink connector corrupts its output topic
> ------------------------------------------------
>
> Key: FLINK-30681
> URL: https://issues.apache.org/jira/browse/FLINK-30681
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Affects Versions: 1.15.3
> Reporter: Jacek Wislicki
> Priority: Major
>
> When PulsarSink writes a message to its output topic, the topic gets
> permanently corrupted and cannot be used anymore (even with newly created
> subscriptions).
> We have isolated this behaviour to a minimal project demonstrating the
> problem available on [GitHub|https://github.com/JacekWislicki/vp-test5]:
> # There are 2 topics: IN and OUT
> # IN is subscribed by a Flink's InToOutJob (with PulsarSource) and writes to
> OUT (with PulsarSink)
> # OUT is subscribed by a Pulsar's OutReadFunction
> # When we write directly to OUT (e.g., with OutTopicProducer),
> OutReadFunction gets each message from its backlog and processes it with no
> issue (the ledger position updates)
> # When we write to IN (e.g., with InTopicProducer), InToOutJob reads the
> message, processes it and writes to OUT
> # OutReadFunction reads the message, the ledger position updates, but nothing
> happens
> ## Further messages written to OUT are not read as OUT is blocked on the last
> message from Flink
> ## Truncating OUT does not help, neither does unsubscribing or creating a new
> subscription
> Reproduced with Pulsar 2.9.1, 2.9.2 and 2.10.2.
> The issue does not occur when we use our custom temporary old SinkFunction
> implementation based on a Pulsar producer writing to OUT.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)