Just a thought: instead of embedding the RabbitMQ streams inside the checkpoint mark, could you keep a global static map of RabbitMQ streams keyed by a unique UUID. Then all you have to serialize inside the CheckpointMark is the UUID; you can look up the actual stream in the constructor of the CheckpointMark and cache it in a volatile variable that won't be serialized.
This does mean that if the source shard starts up on a new machine (this will happen after a crash or if a runner load balances to another machine) then you cannot recover the same RabbitMQ stream. I presume (hope!) that RabbitMQ must have some sort ack timeout and will redeliver the messages after a while. In this case those messages will get "stuck" until RabbitMQ redelivers them, but will eventually show up again on the new RabbitMQ stream. (I hope that opening a new stream would not redeliver messages that had already been successfully acked on the previous stream). Would this work? Reuven On Thu, Nov 14, 2019 at 7:16 AM Daniel Robert <daniel.rob...@acm.org> wrote: > We may be talking past each other a bit, though I do appreciate the > responses. > > Rabbit behaves a lot like a relational database in terms of state > required. A connection is analogous to a database connection, and a channel > (poor analogy here) is similar to an open transaction. If the connection is > severed, the transaction will not be able to be committed. > > In direct response to the consumer lifecycle linked to, yes, one can > recover and re-establish connections, but any state maintained within the > previous channel are lost. If there were messages that had not been > acknowledged, they would have been re-delivered to some other consumer as > they were never acknowledged. > > "Subscription" isn't really the model in rabbit. It has advantages and > disadvantages when compared with kafka -- mostly out of scope here -- but > some quick advantages of the rabbit model: 1) it parallelizes "infinitely" > without any changes to server (no re-partitioning or the like); 2) messages > can be acknowledge in a separate order than they were consumed; 3) because > state is managed associated with an active connection, at-least-once > delivery semantics are easy to implement as any disconnection will result > in the messages being re-placed in the queue and delivered to a new > consumer. To say it's "incompatible with any fault tolerant semantics" is > unfair, they just aren't incompatible with Beam's, as Beam is currently > implemented. > > Regardless, I'm now wondering what the best path forward is. Rabbit isn't > unusable in Beam if the set of requirements and tradeoffs are well > documented. That is, there are use cases that could be properly supported > and some that likely can't. > > One option would be to use a pull-based api and immediately acknowledge > each message as they arrive. This would effectively make the CheckpointMark > a no-op, other than maintaining the watermark. In a pipeline that uses > fixed windows (or non-session windowing) and uses a runner that supports > 'Drain'-style semantics (like Dataflow) this should work just fine I think. > > Another would be to do a best-attempt at acknowledging as late as > possible. This would be a hybrid approach where we attempt acknowledgements > in the CheckpointMark, but use a special Coder that acknowledges all > messages at the point the CheckpointMark is encoded. I think this feels a > bit unsafe and overly complex, and I'm not sure it solves any real-world > problems. > > I also feel like perhaps we should include Beam IO documentation that > makes it clear that an unbounded source that requires a persistent > connection for state tracking is not supportable by beam. > > Thanks, > -Danny > On 11/14/19 7:49 AM, Jan Lukavský wrote: > > Hi, as I said, I didn't dig too deep into that, but what I saw was [1]. > Generally, if RabbitMQ would have no way to recover subscription (which I > don't think is the case), then it would not be incompatible with beam, but > actually with would be incompatible any fault tolerant semantics. > > [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle > > Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert > <daniel.rob...@acm.org> <daniel.rob...@acm.org>: > > > On 11/14/19 2:32 AM, Jan Lukavský wrote: > > Hi Danny, > > as Eugene pointed out, there are essentially two "modes of operation" of > CheckpointMark. It can: > > a) be used to somehow restore state of a reader (in call to > UnboundedSource#createReader) > > b) confirm processed elements in CheckpointMark#finalizeCheckpoint > > If your source doesn't provide a persistent position in data stream that > can be referred to (and serialized - example of this would be kafka > offsets), then what you actually need to serialize is not the channel, but > a way how to restore it - e.g. by opening a new channel with a given > 'consumer group name'. Then you just use this checkpoint to commit your > processed data in finalizeCheckpoint. > > Note that the finalizeCheckpoint is not guaranteed to be called - that can > happen in cases when an error occurs and the source has to be rewind back - > that is what direct runner emulates with the probability of > 'readerReuseChance'. > > I'm reading the documentation of RabbitMQ very quickly, but if I > understand it correctly, then you have to create a subscription to the > broker, serialize identifier of the subscription into the checkpointmark > and then just recover the subscription in call to > UnboundedSource#createReader. That should do the trick. > > I have not seen any such documentation in rabbit. My understanding is it > has to be the same, physical connection and channel. Can you cite the > source you were looking at? > > -Danny > > Hope this helps, sorry if I'm not using 100% correct RabbitMQ terminology > as I said, I'm not quite familiar with it. > > Best, > > Jan > On 11/14/19 5:26 AM, Daniel Robert wrote: > > I believe I've nailed down a situation that happens in practice that > causes Beam and Rabbit to be incompatible. It seems that runners can and do > make assumptions about the serializability (via Coder) of a CheckpointMark. > > To start, these are the semantics of RabbitMQ: > > - the client establishes a connection to the server > - client opens a channel on the connection > - messages are either pulled or pushed to the client from the server along > this channel > - when messages are done processing, they are acknowledged *client-side* > and must be acknowledged on the *same channel* that originally received the > message. > > Since a channel (or any open connection) is non-serializable, it means > that a CheckpointMark that has been serialized cannot ever be used to > acknowledge these messages and correctly 'finalize' the checkpoint. It > also, as previously discussed in this thread, implies a rabbit Reader > cannot accept an existing CheckpointMark at all; the Reader and the > CheckpointMark must share the same connection to the rabbit server > ("channel"). > > Next, I've found how DirectRunner (and presumably others) can attempt to > serialize a CheckpointMark that has not been finalized. In > https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, > the DirectRunner applies a probability and if it hits, it sets the current > reader to 'null' but retains the existing CheckpointMark, which it then > attempts to pass to a new reader via a Coder. > > This puts the shard, the runner, and the reader with differing views of > the world. In UnboundedReadEvaluatorFactory's processElement function, a > call to getReader(shard) ( > https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 > ) clones the shard's checkpoint mark and passes that to the new reader. The > reader ignores it, creating its own, but even if it accepted it, it would > be accepting a serialized CheckpointMark, which wouldn't work. Later, the > runner calls finishRead ( > https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 > ). The shard's CheckpointMark (unserialized; which should still be valid) > is finalized. The reader's CheckpointMark (which may be a different > instance) becomes the return value, which is referred to as > "finishedCheckpoint" in the calling code, which is misleading at best and > problematic at worst as *this* checkpoint has not been finalized. > > So, tl;dr: I cannot find any means of maintaining a persistent connection > to the server for finalizing checkpoints that is safe across runners. If > there's a guarantee all of the shards are on the same JVM instance, I could > rely on global, static collections/instances as a workaround, but if other > runners might serialize this across the wire, I'm stumped. The only > workable situation I can think of right now is to proactively acknowledge > messages as they are received and effectively no-op in finalizeCheckpoint. > This is very different, semantically, and can lead to dropped messages if a > pipeline doesn't finish processing the given message. > > Any help would be much appreciated. > > Thanks, > -Danny > On 11/7/19 10:27 PM, Eugene Kirpichov wrote: > > Hi Daniel, > > This is probably insufficiently well documented. The CheckpointMark is > used for two purposes: > 1) To persistently store some notion of how much of the stream has been > consumed, so that if something fails we can tell the underlying streaming > system where to start reading when we re-create the reader. This is why > CheckpointMark is Serializable. E.g. this makes sense for Kafka. > 2) To do acks - to let the underlying streaming system know that the Beam > pipeline will never need data up to this CheckpointMark. Acking does not > require serializability - runners call ack() on the same in-memory instance > of CheckpointMark that was produced by the reader. E.g. this makes sense > for RabbitMq or Pubsub. > > In practice, these two capabilities tend to be mutually exclusive: some > streaming systems can provide a serializable CheckpointMark, some can do > acks, some can do neither - but very few (or none) can do both, and it's > debatable whether it even makes sense for a system to provide both > capabilities: usually acking is an implicit form of streaming-system-side > checkpointing, i.e. when you re-create the reader you don't actually need > to carry over any information from an old CheckpointMark - the necessary > state (which records should be delivered) is maintained on the streaming > system side. > > These two are lumped together into one API simply because that was the > best design option we came up with (not for lack of trying, but suggestions > very much welcome - AFAIK nobody is happy with it). > > RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it > can do acks. So you can simply ignore the non-serializability. > > On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org> > wrote: > > (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. > As part of this I switched to a pull-based API rather than the > previously-used push-based. This has caused some nebulous problems so > put up a correction PR that I think needs some eyes fairly quickly as > I'd consider master to be broken for rabbitmq right now. The PR keeps > the upgrade but reverts to the same push-based implementation as in 4.x: > https://github.com/apache/beam/pull/9977 ) > > Regardless, in trying to get the pull-based API to work, I'm finding the > interactions between rabbitmq and beam with CheckpointMark to be > fundamentally impossible to implement so I'm hoping for some input here. > > CheckointMark itself must be Serializable, presumably this means it gets > shuffled around between nodes. However 'Channel', the tunnel through > which it communicates with Rabbit to ack messages and finalize the > checkpoint, is non-Serializable. Like most other CheckpointMark > implementations, Channel is 'transient'. When a new CheckpointMark is > instantiated, it's given a Channel. If an existing one is supplied to > the Reader's constructor (part of the 'startReader()' interface), the > channel is overwritten. > > *However*, Rabbit does not support 'ack'ing messages on a channel other > than the one that consumed them in the first place. Attempting to do so > results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See > > https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed > ). > > Truthfully, I don't really understand how the current implementation is > working; it seems like a happy accident. But I'm curious if someone > could help me debug and implement how to bridge the > re-usable/serializable CheckpointMark requirement in Beam with this > limitation of Rabbit. > > Thanks, > -Daniel Robert > > >