[
https://issues.apache.org/jira/browse/BEAM-7434?focusedWorklogId=337865&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337865
]
ASF GitHub Bot logged work on BEAM-7434:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Nov/19 23:25
Start Date: 03/Nov/19 23:25
Worklog Time Spent: 10m
Work Description: drobert commented on pull request #9977: [BEAM-7434]
[BEAM-5895] and [BEAM-5894] Fix upgrade to rabbit amqp-client 5.x
URL: https://github.com/apache/beam/pull/9977
In https://github.com/apache/beam/pull/9900 I migrated the rabbitmq amqp
client from the 4.x line to the 5.x line. This upgrade removed the deprecated
`QueueingConsumer` so I switched to a pull-based rather than push-based api.
It turns out this change had a semantic difference. The `channel.basicGet`
(pull) is a blocking call. If no messages are delivered to the caller for a
long period of time (for example, if no messages have been published to rabbit
for some time), the Runner may detect that the reader is stuck and fail with:
> java.io.IOException: Failed to advance source: ...
So it seems it's imperative to use a non-blocking read.
In this PR, Implement a slightly simpler `QueueingConsumer` replacement.
This version uses a single-element `LinkedBlockingQueue` for fairness; that is
if N consumers are used (from `split`) then each will have at most one
unprocessed message. This exposes a `poll` API to the caller such that
effectively we have a pull-based API with get-with-timeout semantics but using
a push-based API.
Note that amqp `prefetch` cannot be used to simulate this. While prefetch
*will* cap the number of messages to delivered to the client at once, it will
refuse to deliver any more until the delivered messages have been `ack`d, which
we cannot guarantee as we don't `ack` until `finalizeCheckpoint`.
I've chosen to allow the Consumer to block for up to 10 minutes, which means
that if more than 10 minutes elapse between successive calls to `advance` this
will fail. I'm not entirely certain on the upstream impact of this, although it
is something that will be handled by the configured channel `ExceptionHandler`,
the [default
implementation](https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/impl/StrictExceptionHandler.java#L61)
of which will close the Connection. It's unclear to me if we want to modify
this, extend the time, make the time configurable, or leave it as is.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 337865)
Time Spent: 1h (was: 50m)
> RabbitMqIO uses a deprecated API
> --------------------------------
>
> Key: BEAM-7434
> URL: https://issues.apache.org/jira/browse/BEAM-7434
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Reporter: Nicolas Delsaux
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> The RabbitMqIo class reader (UnboundedRabbitMqReader) uses the
> QueueingConsumer, which is denoted as deprecated on RabbitMq side. RabbitMqIo
> should replace this consumer with the DefaultConsumer provided by RabbitMq.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)