Just to clarify one thing: CheckpointMark does not need to be Java
Seralizable. All that's needed is do return a Coder for the CheckpointMark
in getCheckpointMarkCoder.

On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <j...@google.com> wrote:

> Hi Daniel,
>
> This is probably insufficiently well documented. The CheckpointMark is
> used for two purposes:
> 1) To persistently store some notion of how much of the stream has been
> consumed, so that if something fails we can tell the underlying streaming
> system where to start reading when we re-create the reader. This is why
> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
> 2) To do acks - to let the underlying streaming system know that the Beam
> pipeline will never need data up to this CheckpointMark. Acking does not
> require serializability - runners call ack() on the same in-memory instance
> of CheckpointMark that was produced by the reader. E.g. this makes sense
> for RabbitMq or Pubsub.
>
> In practice, these two capabilities tend to be mutually exclusive: some
> streaming systems can provide a serializable CheckpointMark, some can do
> acks, some can do neither - but very few (or none) can do both, and it's
> debatable whether it even makes sense for a system to provide both
> capabilities: usually acking is an implicit form of streaming-system-side
> checkpointing, i.e. when you re-create the reader you don't actually need
> to carry over any information from an old CheckpointMark - the necessary
> state (which records should be delivered) is maintained on the streaming
> system side.
>
> These two are lumped together into one API simply because that was the
> best design option we came up with (not for lack of trying, but suggestions
> very much welcome - AFAIK nobody is happy with it).
>
> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
> can do acks. So you can simply ignore the non-serializability.
>
> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org>
> wrote:
>
>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>> As part of this I switched to a pull-based API rather than the
>> previously-used push-based. This has caused some nebulous problems so
>> put up a correction PR that I think needs some eyes fairly quickly as
>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>> the upgrade but reverts to the same push-based implementation as in 4.x:
>> https://github.com/apache/beam/pull/9977 )
>>
>> Regardless, in trying to get the pull-based API to work, I'm finding the
>> interactions between rabbitmq and beam with CheckpointMark to be
>> fundamentally impossible to implement so I'm hoping for some input here.
>>
>> CheckointMark itself must be Serializable, presumably this means it gets
>> shuffled around between nodes. However 'Channel', the tunnel through
>> which it communicates with Rabbit to ack messages and finalize the
>> checkpoint, is non-Serializable. Like most other CheckpointMark
>> implementations, Channel is 'transient'. When a new CheckpointMark is
>> instantiated, it's given a Channel. If an existing one is supplied to
>> the Reader's constructor (part of the 'startReader()' interface), the
>> channel is overwritten.
>>
>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>> than the one that consumed them in the first place. Attempting to do so
>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>
>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>> ).
>>
>> Truthfully, I don't really understand how the current implementation is
>> working; it seems like a happy accident. But I'm curious if someone
>> could help me debug and implement how to bridge the
>> re-usable/serializable CheckpointMark requirement in Beam with this
>> limitation of Rabbit.
>>
>> Thanks,
>> -Daniel Robert
>>
>>

Reply via email to