[
https://issues.apache.org/jira/browse/BEAM-8513?focusedWorklogId=336344&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-336344
]
ASF GitHub Bot logged work on BEAM-8513:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Oct/19 17:50
Start Date: 30/Oct/19 17:50
Worklog Time Spent: 10m
Work Description: drobert commented on pull request #9937: [BEAM-8513]
Allow reads from exchange-bound queue without declaring the exchange
URL: https://github.com/apache/beam/pull/9937
Per [BEAM-8513](https://issues.apache.org/jira/browse/BEAM-8513) it seems
there's no way to utilize an already-existing exchange as Beam's RabbitMqIO
will always attempt to declare it, and it does so with `durable=false`. In some
brokers, like qpid, this is fine, but it rabbit if you attempt to declare a
queue or exchange with different properties than the existing one (e.g. if the
existing is durable and the declaration is not) then the declaration fails.
Further, a lot of the documentation about how to work with rabbit and amqp
is misleading or flat out incorrect. The assertion that one 'interacts with a
queue *or* interacts with an exchange' is false; one always: reads from a
queue; registers the queue with the exchange; and binds messages to the queue
(typically based on routing key). The documentation around interacting with a
queue exclusively seems to be oriented around how a 'direct exchange' operates,
and may only work with the default exchange.
The changes in this PR:
- Updates documentation and tests to be more exchange-oriented, as rabbit
has many routing paradigms and exchanges with routing keys is the common thread
between them.
- Introduces an `exchangeDeclare` property in `Read`
- adds a separate `withExchange` function in `Builder` and enforces exchange
declaration invariants accordingly
While writing a test for the case of BEAM-8513 (attempting to force a
failure when declaring a non-durable exchange when a durable one of the same
name already exists) I found qpid did not fail this case. I took the
opportunity to upgrade the qpid broker to the latest, as the previous version
is now over six years old. This did not solve the problem, but a more modern
library seems advantageous.
Full test changes:
- upgrading to the latest version of qpid
- extracting parts of the rabbitmq test so that utilities and test
declarations are moved to separate files/classes for re-use and documentation
- added more test coverage for some configuration options, including
invariants and different exchange/routing types
R: @jbonofre
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 336344)
Remaining Estimate: 0h
Time Spent: 10m
> RabbitMqIO: Allow reads from exchange-bound queue without declaring the
> exchange
> --------------------------------------------------------------------------------
>
> Key: BEAM-8513
> URL: https://issues.apache.org/jira/browse/BEAM-8513
> Project: Beam
> Issue Type: Improvement
> Components: io-java-rabbitmq
> Environment: testing with DirectRunner
> Reporter: Nick Aldwin
> Priority: Critical
> Time Spent: 10m
> Remaining Estimate: 0h
>
> The RabbitMqIO always declares an exchange if it is configured to read from
> it. This is problematic with pre-existing exchanges (a relatively common
> pattern), as there's no provided configuration for the exchange beyond
> exchange type. (We stumbled on this because RabbitMqIO always declares a
> non-durable exchange, which fails if the exchange already exists as a durable
> exchange)
>
> A solution to this would be to allow RabbitMqIO to read from an exchange
> without declaring it. This pattern is already available for queues via the
> `queueDeclare` flag. I propose an `exchangeDeclare` flag which preserves
> existing behavior except for skipping the call to `exchangeDeclare` before
> binding the queue to the exchange.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)