[
https://issues.apache.org/jira/browse/FLINK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-20628:
-----------------------------------
Labels: auto-deprioritized-major pull-request-available stale-assigned
(was: auto-deprioritized-major pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issue is assigned but has not
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a
comment updating the community on your progress. If this issue is waiting on
feedback, please consider this a reminder to the committer/reviewer. Flink is a
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone
else may work on it.
> Port RabbitMQ Sources to FLIP-27 API
> ------------------------------------
>
> Key: FLINK-20628
> URL: https://issues.apache.org/jira/browse/FLINK-20628
> Project: Flink
> Issue Type: Improvement
> Components: Connectors/ RabbitMQ
> Reporter: Jan Westphal
> Assignee: RocMarshal
> Priority: Minor
> Labels: auto-deprioritized-major, pull-request-available,
> stale-assigned
>
> *Structure*
> The new RabbitMQ Source will have three components:
> * RabbitMQ enumerator that receives one RabbitMQ Channel Config.
> * RabbitMQ splits contain the RabbitMQ Channel Config
> * RabbitMQ Readers which subscribe to the same RabbitMQ channel and receive
> the messages (automatically load balanced by RabbitMQ).
> *Checkpointing Enumerators*
> The enumerator only needs to checkpoint the RabbitMQ channel config since the
> continuous discovery of new unread/unhandled messages is taken care of by the
> subscribed RabbitMQ readers and RabbitMQ itself.
> *Checkpointing Readers*
> The new RabbitMQ Source needs to ensure that every reader can be checkpointed.
> Since RabbitMQ is non-persistent and cannot be read by offset, a combined
> usage of checkpoints and message acknowledgments is necessary. Until a
> received message is checkpointed by a reader, it will stay in an
> un-acknowledge state. As soon as the checkpoint is created, the messages from
> the last checkpoint can be acknowledged as handled against RabbitMQ and thus
> will be deleted only then. Messages need to be acknowledged one by one as
> messages are handled by each SourceReader individually.
> When deserializing the messages we will make use of the implementation in the
> existing RabbitMQ Source.
> *Message Delivery Guarantees*
> Unacknowledged messages of a reader will be redelivered by RabbitMQ
> automatically to other consumers of the same channel if the reader goes down.
>
> This Source is going to only support at-least-once as this is the default
> RabbitMQ behavior and thus everything else would require changes to RabbitMQ
> itself or would impair the idea of parallelizing SourceReaders.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)