[
https://issues.apache.org/jira/browse/FLINK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan reassigned FLINK-20628:
-------------------------------
Assignee: (was: RocMarshal)
> Port RabbitMQ Sources to FLIP-27 API
> ------------------------------------
>
> Key: FLINK-20628
> URL: https://issues.apache.org/jira/browse/FLINK-20628
> Project: Flink
> Issue Type: Improvement
> Components: Connectors/ RabbitMQ
> Reporter: Jan Westphal
> Priority: Minor
> Labels: auto-deprioritized-major, pull-request-available,
> stale-assigned
>
> *Structure*
> The new RabbitMQ Source will have three components:
> * RabbitMQ enumerator that receives one RabbitMQ Channel Config.
> * RabbitMQ splits contain the RabbitMQ Channel Config
> * RabbitMQ Readers which subscribe to the same RabbitMQ channel and receive
> the messages (automatically load balanced by RabbitMQ).
> *Checkpointing Enumerators*
> The enumerator only needs to checkpoint the RabbitMQ channel config since the
> continuous discovery of new unread/unhandled messages is taken care of by the
> subscribed RabbitMQ readers and RabbitMQ itself.
> *Checkpointing Readers*
> The new RabbitMQ Source needs to ensure that every reader can be checkpointed.
> Since RabbitMQ is non-persistent and cannot be read by offset, a combined
> usage of checkpoints and message acknowledgments is necessary. Until a
> received message is checkpointed by a reader, it will stay in an
> un-acknowledge state. As soon as the checkpoint is created, the messages from
> the last checkpoint can be acknowledged as handled against RabbitMQ and thus
> will be deleted only then. Messages need to be acknowledged one by one as
> messages are handled by each SourceReader individually.
> When deserializing the messages we will make use of the implementation in the
> existing RabbitMQ Source.
> *Message Delivery Guarantees*
> Unacknowledged messages of a reader will be redelivered by RabbitMQ
> automatically to other consumers of the same channel if the reader goes down.
>
> This Source is going to only support at-least-once as this is the default
> RabbitMQ behavior and thus everything else would require changes to RabbitMQ
> itself or would impair the idea of parallelizing SourceReaders.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)