Just to clarify one thing: CheckpointMark does not need to be Java Seralizable. All that's needed is do return a Coder for the CheckpointMark in getCheckpointMarkCoder.
On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <j...@google.com> wrote: > Hi Daniel, > > This is probably insufficiently well documented. The CheckpointMark is > used for two purposes: > 1) To persistently store some notion of how much of the stream has been > consumed, so that if something fails we can tell the underlying streaming > system where to start reading when we re-create the reader. This is why > CheckpointMark is Serializable. E.g. this makes sense for Kafka. > 2) To do acks - to let the underlying streaming system know that the Beam > pipeline will never need data up to this CheckpointMark. Acking does not > require serializability - runners call ack() on the same in-memory instance > of CheckpointMark that was produced by the reader. E.g. this makes sense > for RabbitMq or Pubsub. > > In practice, these two capabilities tend to be mutually exclusive: some > streaming systems can provide a serializable CheckpointMark, some can do > acks, some can do neither - but very few (or none) can do both, and it's > debatable whether it even makes sense for a system to provide both > capabilities: usually acking is an implicit form of streaming-system-side > checkpointing, i.e. when you re-create the reader you don't actually need > to carry over any information from an old CheckpointMark - the necessary > state (which records should be delivered) is maintained on the streaming > system side. > > These two are lumped together into one API simply because that was the > best design option we came up with (not for lack of trying, but suggestions > very much welcome - AFAIK nobody is happy with it). > > RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it > can do acks. So you can simply ignore the non-serializability. > > On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org> > wrote: > >> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. >> As part of this I switched to a pull-based API rather than the >> previously-used push-based. This has caused some nebulous problems so >> put up a correction PR that I think needs some eyes fairly quickly as >> I'd consider master to be broken for rabbitmq right now. The PR keeps >> the upgrade but reverts to the same push-based implementation as in 4.x: >> https://github.com/apache/beam/pull/9977 ) >> >> Regardless, in trying to get the pull-based API to work, I'm finding the >> interactions between rabbitmq and beam with CheckpointMark to be >> fundamentally impossible to implement so I'm hoping for some input here. >> >> CheckointMark itself must be Serializable, presumably this means it gets >> shuffled around between nodes. However 'Channel', the tunnel through >> which it communicates with Rabbit to ack messages and finalize the >> checkpoint, is non-Serializable. Like most other CheckpointMark >> implementations, Channel is 'transient'. When a new CheckpointMark is >> instantiated, it's given a Channel. If an existing one is supplied to >> the Reader's constructor (part of the 'startReader()' interface), the >> channel is overwritten. >> >> *However*, Rabbit does not support 'ack'ing messages on a channel other >> than the one that consumed them in the first place. Attempting to do so >> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See >> >> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed >> ). >> >> Truthfully, I don't really understand how the current implementation is >> working; it seems like a happy accident. But I'm curious if someone >> could help me debug and implement how to bridge the >> re-usable/serializable CheckpointMark requirement in Beam with this >> limitation of Rabbit. >> >> Thanks, >> -Daniel Robert >> >>