[
https://issues.apache.org/jira/browse/FLINK-21373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martijn Visser reassigned FLINK-21373:
--------------------------------------
Assignee: Ahmed Hamdy
> Port RabbitMQ Sink to FLIP-143 API
> ----------------------------------
>
> Key: FLINK-21373
> URL: https://issues.apache.org/jira/browse/FLINK-21373
> Project: Flink
> Issue Type: Improvement
> Components: Connectors/ RabbitMQ
> Reporter: Jan Westphal
> Assignee: Ahmed Hamdy
> Priority: Minor
> Labels: auto-unassigned, pull-request-available
> Fix For: 1.12.0
>
>
> *Structure*
> The unified Sink API provides a Writer, a Committer and a GlobalCommitter.
> Right now we don’t see the need to use the Committer and GlobalCommitter as
> the Writer is sufficient to hold up to the consistencies. Since we are in the
> need of asynchronous RabbitMQ callbacks to know whether or not a message was
> published successfully and have to store unacknowledged messages in the
> checkpoint, there would be a large bidirectional communication and state
> exchange overhead between the Writer and the Committer.
> *At-most-once*
> The Writer receives a message from Flink and simply publishes it to RabbitMQ.
> The current RabbitMQ Sink only provides this mode.
> *At-least-once*
> The objective here is, to receive an acknowledgement by RabbitMQ for
> published messages. Therefore, before publishing a message, we store the
> message in a Map with the sequence number as its key. If the message is
> acknowledged by RabbitMQ we can remove it from the Map. If we don’t receive
> an acknowledgement for a certain amount of time (or a RabbitMQ specific so
> called negative acknowledgement) we will try to resend the message when
> doing a checkpoint.
> *Exactly-once*
> On checkpointing we send all messages by Flink in transaction mode to
> RabbitMQ. This way, all the messages get sent or are rolled back on failure.
> All messages that are not sent successfully are written to the checkpoint and
> are tried to be sent with the next checkpoint.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)