Just a thought: instead of embedding the RabbitMQ streams inside the
checkpoint mark, could you keep a global static map of RabbitMQ streams
keyed by a unique UUID. Then all you have to serialize inside the
CheckpointMark is the UUID; you can look up the actual stream in the
constructor of the CheckpointMark and cache it in a volatile variable that
won't be serialized.

This does mean that if the source shard starts up on a new machine (this
will happen after a crash or if a runner load balances to another machine)
then you cannot recover the same RabbitMQ stream. I presume (hope!) that
RabbitMQ must have some sort ack timeout and will redeliver the messages
after a while. In this case those messages will get "stuck" until RabbitMQ
redelivers them, but will eventually show up again on the new RabbitMQ
stream. (I hope that opening a new stream would not redeliver messages that
had already been successfully acked on the previous stream).

Would this work?

Reuven

On Thu, Nov 14, 2019 at 7:16 AM Daniel Robert <daniel.rob...@acm.org> wrote:

> We may be talking past each other a bit, though I do appreciate the
> responses.
>
> Rabbit behaves a lot like a relational database in terms of state
> required. A connection is analogous to a database connection, and a channel
> (poor analogy here) is similar to an open transaction. If the connection is
> severed, the transaction will not be able to be committed.
>
> In direct response to the consumer lifecycle linked to, yes, one can
> recover and re-establish connections, but any state maintained within the
> previous channel are lost. If there were messages that had not been
> acknowledged, they would have been re-delivered to some other consumer as
> they were never acknowledged.
>
> "Subscription" isn't really the model in rabbit. It has advantages and
> disadvantages when compared with kafka -- mostly out of scope here -- but
> some quick advantages of the rabbit model: 1) it parallelizes "infinitely"
> without any changes to server (no re-partitioning or the like); 2) messages
> can be acknowledge in a separate order than they were consumed; 3) because
> state is managed associated with an active connection, at-least-once
> delivery semantics are easy to implement as any disconnection will result
> in the messages being re-placed in the queue and delivered to a new
> consumer. To say it's "incompatible with any fault tolerant semantics" is
> unfair, they just aren't incompatible with Beam's, as Beam is currently
> implemented.
>
> Regardless, I'm now wondering what the best path forward is. Rabbit isn't
> unusable in Beam if the set of requirements and tradeoffs are well
> documented. That is, there are use cases that could be properly supported
> and some that likely can't.
>
> One option would be to use a pull-based api and immediately acknowledge
> each message as they arrive. This would effectively make the CheckpointMark
> a no-op, other than maintaining the watermark. In a pipeline that uses
> fixed windows (or non-session windowing) and uses a runner that supports
> 'Drain'-style semantics (like Dataflow) this should work just fine I think.
>
> Another would be to do a best-attempt at acknowledging as late as
> possible. This would be a hybrid approach where we attempt acknowledgements
> in the CheckpointMark, but use a special Coder that acknowledges all
> messages at the point the CheckpointMark is encoded. I think this feels a
> bit unsafe and overly complex, and I'm not sure it solves any real-world
> problems.
>
> I also feel like perhaps we should include Beam IO documentation that
> makes it clear that an unbounded source that requires a persistent
> connection for state tracking is not supportable by beam.
>
> Thanks,
> -Danny
> On 11/14/19 7:49 AM, Jan Lukavský wrote:
>
> Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
> Generally, if RabbitMQ would have no way to recover subscription (which I
> don't think is the case), then it would not be incompatible with beam, but
> actually with would be incompatible any fault tolerant semantics.
>
> [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
>
> Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert
> <daniel.rob...@acm.org> <daniel.rob...@acm.org>:
>
>
> On 11/14/19 2:32 AM, Jan Lukavský wrote:
>
> Hi Danny,
>
> as Eugene pointed out, there are essentially two "modes of operation" of
> CheckpointMark. It can:
>
>  a) be used to somehow restore state of a reader (in call to
> UnboundedSource#createReader)
>
>  b) confirm processed elements in CheckpointMark#finalizeCheckpoint
>
> If your source doesn't provide a persistent position in data stream that
> can be referred to (and serialized - example of this would be kafka
> offsets), then what you actually need to serialize is not the channel, but
> a way how to restore it - e.g. by opening a new channel with a given
> 'consumer group name'. Then you just use this checkpoint to commit your
> processed data in finalizeCheckpoint.
>
> Note that the finalizeCheckpoint is not guaranteed to be called - that can
> happen in cases when an error occurs and the source has to be rewind back -
> that is what direct runner emulates with the probability of
> 'readerReuseChance'.
>
> I'm reading the documentation of RabbitMQ very quickly, but if I
> understand it correctly, then you have to create a subscription to the
> broker, serialize identifier of the subscription into the checkpointmark
> and then just recover the subscription in call to
> UnboundedSource#createReader. That should do the trick.
>
> I have not seen any such documentation in rabbit. My understanding is it
> has to be the same, physical connection and channel. Can you cite the
> source you were looking at?
>
> -Danny
>
> Hope this helps, sorry if I'm not using 100% correct RabbitMQ terminology
> as I said, I'm not quite familiar with it.
>
> Best,
>
>  Jan
> On 11/14/19 5:26 AM, Daniel Robert wrote:
>
> I believe I've nailed down a situation that happens in practice that
> causes Beam and Rabbit to be incompatible. It seems that runners can and do
> make assumptions about the serializability (via Coder) of a CheckpointMark.
>
> To start, these are the semantics of RabbitMQ:
>
> - the client establishes a connection to the server
> - client opens a channel on the connection
> - messages are either pulled or pushed to the client from the server along
> this channel
> - when messages are done processing, they are acknowledged *client-side*
> and must be acknowledged on the *same channel* that originally received the
> message.
>
> Since a channel (or any open connection) is non-serializable, it means
> that a CheckpointMark that has been serialized cannot ever be used to
> acknowledge these messages and correctly 'finalize' the checkpoint. It
> also, as previously discussed in this thread, implies a rabbit Reader
> cannot accept an existing CheckpointMark at all; the Reader and the
> CheckpointMark must share the same connection to the rabbit server
> ("channel").
>
> Next, I've found how DirectRunner (and presumably others) can attempt to
> serialize a CheckpointMark that has not been finalized. In
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
> the DirectRunner applies a probability and if it hits, it sets the current
> reader to 'null' but retains the existing CheckpointMark, which it then
> attempts to pass to a new reader via a Coder.
>
> This puts the shard, the runner, and the reader with differing views of
> the world. In UnboundedReadEvaluatorFactory's processElement function, a
> call to getReader(shard) (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
> ) clones the shard's checkpoint mark and passes that to the new reader. The
> reader ignores it, creating its own, but even if it accepted it, it would
> be accepting a serialized CheckpointMark, which wouldn't work. Later, the
> runner calls finishRead (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
> ). The shard's CheckpointMark (unserialized; which should still be valid)
> is finalized. The reader's CheckpointMark (which may be a different
> instance) becomes the return value, which is referred to as
> "finishedCheckpoint" in the calling code, which is misleading at best and
> problematic at worst as *this* checkpoint has not been finalized.
>
> So, tl;dr: I cannot find any means of maintaining a persistent connection
> to the server for finalizing checkpoints that is safe across runners. If
> there's a guarantee all of the shards are on the same JVM instance, I could
> rely on global, static collections/instances as a workaround, but if other
> runners might serialize this across the wire, I'm stumped. The only
> workable situation I can think of right now is to proactively acknowledge
> messages as they are received and effectively no-op in finalizeCheckpoint.
> This is very different, semantically, and can lead to dropped messages if a
> pipeline doesn't finish processing the given message.
>
> Any help would be much appreciated.
>
> Thanks,
> -Danny
> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>
> Hi Daniel,
>
> This is probably insufficiently well documented. The CheckpointMark is
> used for two purposes:
> 1) To persistently store some notion of how much of the stream has been
> consumed, so that if something fails we can tell the underlying streaming
> system where to start reading when we re-create the reader. This is why
> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
> 2) To do acks - to let the underlying streaming system know that the Beam
> pipeline will never need data up to this CheckpointMark. Acking does not
> require serializability - runners call ack() on the same in-memory instance
> of CheckpointMark that was produced by the reader. E.g. this makes sense
> for RabbitMq or Pubsub.
>
> In practice, these two capabilities tend to be mutually exclusive: some
> streaming systems can provide a serializable CheckpointMark, some can do
> acks, some can do neither - but very few (or none) can do both, and it's
> debatable whether it even makes sense for a system to provide both
> capabilities: usually acking is an implicit form of streaming-system-side
> checkpointing, i.e. when you re-create the reader you don't actually need
> to carry over any information from an old CheckpointMark - the necessary
> state (which records should be delivered) is maintained on the streaming
> system side.
>
> These two are lumped together into one API simply because that was the
> best design option we came up with (not for lack of trying, but suggestions
> very much welcome - AFAIK nobody is happy with it).
>
> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
> can do acks. So you can simply ignore the non-serializability.
>
> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org>
> wrote:
>
> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
> As part of this I switched to a pull-based API rather than the
> previously-used push-based. This has caused some nebulous problems so
> put up a correction PR that I think needs some eyes fairly quickly as
> I'd consider master to be broken for rabbitmq right now. The PR keeps
> the upgrade but reverts to the same push-based implementation as in 4.x:
> https://github.com/apache/beam/pull/9977 )
>
> Regardless, in trying to get the pull-based API to work, I'm finding the
> interactions between rabbitmq and beam with CheckpointMark to be
> fundamentally impossible to implement so I'm hoping for some input here.
>
> CheckointMark itself must be Serializable, presumably this means it gets
> shuffled around between nodes. However 'Channel', the tunnel through
> which it communicates with Rabbit to ack messages and finalize the
> checkpoint, is non-Serializable. Like most other CheckpointMark
> implementations, Channel is 'transient'. When a new CheckpointMark is
> instantiated, it's given a Channel. If an existing one is supplied to
> the Reader's constructor (part of the 'startReader()' interface), the
> channel is overwritten.
>
> *However*, Rabbit does not support 'ack'ing messages on a channel other
> than the one that consumed them in the first place. Attempting to do so
> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>
> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
> ).
>
> Truthfully, I don't really understand how the current implementation is
> working; it seems like a happy accident. But I'm curious if someone
> could help me debug and implement how to bridge the
> re-usable/serializable CheckpointMark requirement in Beam with this
> limitation of Rabbit.
>
> Thanks,
> -Daniel Robert
>
>
>

Reply via email to