Hi Daniel, I will review and work on this IO.
Sorry for the delay, I wasn't super active on Beam recently, but now back ;) Regards JB On 31/10/2019 21:38, Daniel Robert wrote: > I'm pretty new to the Beam ecosystem, so apologies if this is not the > right forum for this. > > My team has been learning and starting to use Beam for the past few > months and have run into myriad problems with the RabbitIO connector for > java, aspects of which seem perhaps fundamentally broken or incorrect in > the released implementation. I've tracked our significant issues down > and opened tickets and PRs for them. I'm not certain what the typical > response time is, but given the severity of the issues (as I perceive > them), I'd like to escalate some of these PRs and try to get some fixes > into the next Beam release. > > I originally opened BEAM-8390 (https://github.com/apache/beam/pull/9782) > as it was impossible to set the 'useCorrelationId' property (implying > this functionality was also untested). Since then, I've found and PR'd > the following, which are awaiting feedback/approval: > > 1. Watermarks not advancing > > Ticket/PR: BEAM-8347 - https://github.com/apache/beam/pull/9820 > > Impact: under low message volumes, the watermark never advances and > windows can never 'on time' fire. > > Notes: The RabbitMq UnboundedSource uses 'oldest known time' as a > watermark when other similar sources (and documentation on watermarking) > state for monotonically increasing timestamps (the case with a queue) it > should be the most recent time. I have a few open questions about > testing and implementation details in the PR but it should work as-is. > > 2. Exchanges are always declared, which fail if a pre-existing exchange > differs > > Ticket/PR: BEAM-8513 - https://github.com/apache/beam/pull/9937 > > Impact: It is impossible to utilize an existing, durable exchange. > > Notes: I'm hooking Beam up to an existing topic exchange (an 'event > bus') that is 'durable'. RabbitMqIO current implementation will always > attempt to declare the exchange, and does so as non-durable, which > causes rabbit to fail the declaration. (Interestingly qpid does not fail > in this scenario.) The PR allows the caller to disable declaring the > exchange, similar to `withQueueDeclare` for declaring a queue. > > This PR also calls out a lot of the documentation that seems misleading; > implying that one either interacts with queues *or* exchanges when that > is not how AMQP fundamentally operates. The implementation of the > RabbitMqIO connector before this PR seems like it probably works with > the default exchange and maybe a fanout exchange, but not a topic exchange. > > 3. Library versions > > Tickets/PR: BEAM-7434, BEAM-5895, and BEAM-5894 - > https://github.com/apache/beam/pull/9900 > > Impact: The rabbitmq amqp client for java released the 5.x line in > September of 2017. Some automated tickets were open to upgrade, plus a > manual ticket to drop the use of the deprecated QueueConsumer API. > > Notes: The upgrade was relatively simple, but I implemented it using a > pull-based API rather than push-based (Consumer) which may warrant some > discussion. I'm used to discussing this type of thing over PRs but I'm > happy to do whatever the community prefers. > > --- > > Numbers 1 and 2 above are 'dealbreaker' issues for my team. They > effectively make rabbitmq unusable as an unbounded source, forcing > developers to fork and modify the code. Number 3 is much less > significant and I've put it here more for 'good, clean living' than an > urgent issue. > > Aside from the open issues, given the low response rate so far, I'd be > more than happy to take on a more active role in maintaining/reviewing > the rabbitmq io for java. For now, however, is this list the best way to > 'bump' these open issues and move forward? Further, is the general > approach before opening a PR to ask some preliminary questions in this > email list? > > Thank you, > -Daniel Robert > -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com