(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous problems so
put up a correction PR that I think needs some eyes fairly quickly as
I'd consider master to be broken for rabbitmq right now. The PR keeps
the upgrade but reverts to the same push-based implementation as in 4.x:
https://github.com/apache/beam/pull/9977 )
Regardless, in trying to get the pull-based API to work, I'm finding the
interactions between rabbitmq and beam with CheckpointMark to be
fundamentally impossible to implement so I'm hoping for some input here.
CheckointMark itself must be Serializable, presumably this means it gets
shuffled around between nodes. However 'Channel', the tunnel through
which it communicates with Rabbit to ack messages and finalize the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new CheckpointMark is
instantiated, it's given a Channel. If an existing one is supplied to
the Reader's constructor (part of the 'startReader()' interface), the
channel is overwritten.
*However*, Rabbit does not support 'ack'ing messages on a channel other
than the one that consumed them in the first place. Attempting to do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
).
Truthfully, I don't really understand how the current implementation is
working; it seems like a happy accident. But I'm curious if someone
could help me debug and implement how to bridge the
re-usable/serializable CheckpointMark requirement in Beam with this
limitation of Rabbit.
Thanks,
-Daniel Robert
- RabbitMQ and CheckpointMark feasibility Daniel Robert
-