Hi Daniel,

This is probably insufficiently well documented. The CheckpointMark is used
for two purposes:
1) To persistently store some notion of how much of the stream has been
consumed, so that if something fails we can tell the underlying streaming
system where to start reading when we re-create the reader. This is why
CheckpointMark is Serializable. E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming system know that the Beam
pipeline will never need data up to this CheckpointMark. Acking does not
require serializability - runners call ack() on the same in-memory instance
of CheckpointMark that was produced by the reader. E.g. this makes sense
for RabbitMq or Pubsub.

In practice, these two capabilities tend to be mutually exclusive: some
streaming systems can provide a serializable CheckpointMark, some can do
acks, some can do neither - but very few (or none) can do both, and it's
debatable whether it even makes sense for a system to provide both
capabilities: usually acking is an implicit form of streaming-system-side
checkpointing, i.e. when you re-create the reader you don't actually need
to carry over any information from an old CheckpointMark - the necessary
state (which records should be delivered) is maintained on the streaming
system side.

These two are lumped together into one API simply because that was the best
design option we came up with (not for lack of trying, but suggestions very
much welcome - AFAIK nobody is happy with it).

RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
can do acks. So you can simply ignore the non-serializability.

On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org> wrote:

> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
> As part of this I switched to a pull-based API rather than the
> previously-used push-based. This has caused some nebulous problems so
> put up a correction PR that I think needs some eyes fairly quickly as
> I'd consider master to be broken for rabbitmq right now. The PR keeps
> the upgrade but reverts to the same push-based implementation as in 4.x:
> https://github.com/apache/beam/pull/9977 )
> Regardless, in trying to get the pull-based API to work, I'm finding the
> interactions between rabbitmq and beam with CheckpointMark to be
> fundamentally impossible to implement so I'm hoping for some input here.
> CheckointMark itself must be Serializable, presumably this means it gets
> shuffled around between nodes. However 'Channel', the tunnel through
> which it communicates with Rabbit to ack messages and finalize the
> checkpoint, is non-Serializable. Like most other CheckpointMark
> implementations, Channel is 'transient'. When a new CheckpointMark is
> instantiated, it's given a Channel. If an existing one is supplied to
> the Reader's constructor (part of the 'startReader()' interface), the
> channel is overwritten.
> *However*, Rabbit does not support 'ack'ing messages on a channel other
> than the one that consumed them in the first place. Attempting to do so
> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
> ).
> Truthfully, I don't really understand how the current implementation is
> working; it seems like a happy accident. But I'm curious if someone
> could help me debug and implement how to bridge the
> re-usable/serializable CheckpointMark requirement in Beam with this
> limitation of Rabbit.
> Thanks,
> -Daniel Robert

