[
https://issues.apache.org/jira/browse/BEAM-7434?focusedWorklogId=334695&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334695
]
ASF GitHub Bot logged work on BEAM-7434:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Oct/19 02:37
Start Date: 28/Oct/19 02:37
Worklog Time Spent: 10m
Work Description: drobert commented on pull request #9900: [BEAM-7434]
[BEAM-5895] and [BEAM-5894] Upgrade to rabbit amqp-client 5.x
URL: https://github.com/apache/beam/pull/9900
Primarily addresses BEAM-7434 which notes that the use of the java RabbitMQ
client's `QueueingConsumer` is deprecated in the 4.x line and is slated for
removal in 5.x.
This PR:
* upgrades the java library to 5.x (addressing BEAM-5894 and BEAM-5895)
* replaces QueueingConsumer (push api) with `Channel.basicGet` directly
(pull api)
Notes:
The 4.x use of QueueingConsumer was (presumably) done for two reasons: 1)
thread safety between Channel/Consumer (see history notes in QueueingConsumer);
2) to apply backpressure on `push` by pushing to a blocking queue. Number 1 is
no longer an issue, and I've circumvented number 2 by using a pull-based api
rather than the consumer push-based.
There are some tradeoffs in place in this PR. Previously, using the
push-based, Consumer api, messages could be internally buffered/queued by each
shard of the unbounded source, which saves round-trip latency if there are
messages ready. Both before and after my changes, "prefetch" values were set at
the default "unlimited" (0) value so messages would enqueue/buffer as soon as
they're able. The trade-off there would be higher memory usage, although it's
unlikely this is a practical concern for many.
Using the pull-based api (`channel.basicGet`), each message pull will
require a round-trip to the rabbitmq server, albeit on an already-open
Connection. It should have a nice property where pulling is 'fairer' than the
previous approach, as each shard will only request a message when explicitly
prompted to by the Runner rather than potentially being unacked/buffered within
some other shard.
I'm expecting this API should not cause any real-world problems and the
throughput should be high enough without local buffering. If local
buffering/minimal round-trip time is of concern, I would suggest a follow-up PR
allowing for configuring prefetch (and perhaps other QoS settings).
R: @jbonofre
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 334695)
Remaining Estimate: 0h
Time Spent: 10m
> RabbitMqIO uses a deprecated API
> --------------------------------
>
> Key: BEAM-7434
> URL: https://issues.apache.org/jira/browse/BEAM-7434
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Reporter: Nicolas Delsaux
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> The RabbitMqIo class reader (UnboundedRabbitMqReader) uses the
> QueueingConsumer, which is denoted as deprecated on RabbitMq side. RabbitMqIo
> should replace this consumer with the DefaultConsumer provided by RabbitMq.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)