Just as a matter of curiosity, I wonder why it would be needed to assign a (local) UUIDs to RabbitMQ streams. There seem to be only two options:

 a) RabbitMQ does not support restore of client connection (this is valid, many sources work like that, e.g. plain websocket, or UDP stream)

 b) it does support that (and according to the documentation and overall logic it seems it should)

if a) is true, then the source itself is at most once and there is actually pretty nothing we can do about that.

If b) is true, then there has to be some sort of identifier of the "subscriber", "stream", "session", "consumer", whatever we call it. This identifier should be serializable and transferable to different machine. Every other option leads to (in my point of view) non-sensical conclusions (after all, that's why web browsers have cookies and all of us can enjoy the pleasant view of popups asking us to accept this, right? :))

If there is a serializable identifier of the "stream", "session", "consumer", "subscriber", then the stream could be just recreated in call to UnboundedSource#createReader().

One more not yet mentioned point - the logic in DirectRunner associated with 'readerReuseChance' (assigning reader a null value under some probability distribution) is just an emulation of faults in a real system.

Jan

On 11/14/19 7:21 PM, Reuven Lax wrote:
Just a thought: instead of embedding the RabbitMQ streams inside the checkpoint mark, could you keep a global static map of RabbitMQ streams keyed by a unique UUID. Then all you have to serialize inside the CheckpointMark is the UUID; you can look up the actual stream in the constructor of the CheckpointMark and cache it in a volatile variable that won't be serialized.

This does mean that if the source shard starts up on a new machine (this will happen after a crash or if a runner load balances to another machine) then you cannot recover the same RabbitMQ stream. I presume (hope!) that RabbitMQ must have some sort ack timeout and will redeliver the messages after a while. In this case those messages will get "stuck" until RabbitMQ redelivers them, but will eventually show up again on the new RabbitMQ stream. (I hope that opening a new stream would not redeliver messages that had already been successfully acked on the previous stream).

Would this work?

Reuven

On Thu, Nov 14, 2019 at 7:16 AM Daniel Robert <daniel.rob...@acm.org <mailto:daniel.rob...@acm.org>> wrote:

    We may be talking past each other a bit, though I do appreciate
    the responses.

    Rabbit behaves a lot like a relational database in terms of state
    required. A connection is analogous to a database connection, and
    a channel (poor analogy here) is similar to an open transaction.
    If the connection is severed, the transaction will not be able to
    be committed.

    In direct response to the consumer lifecycle linked to, yes, one
    can recover and re-establish connections, but any state maintained
    within the previous channel are lost. If there were messages that
    had not been acknowledged, they would have been re-delivered to
    some other consumer as they were never acknowledged.

    "Subscription" isn't really the model in rabbit. It has advantages
    and disadvantages when compared with kafka -- mostly out of scope
    here -- but some quick advantages of the rabbit model: 1) it
    parallelizes "infinitely" without any changes to server (no
    re-partitioning or the like); 2) messages can be acknowledge in a
    separate order than they were consumed; 3) because state is
    managed associated with an active connection, at-least-once
    delivery semantics are easy to implement as any disconnection will
    result in the messages being re-placed in the queue and delivered
    to a new consumer. To say it's "incompatible with any fault
    tolerant semantics" is unfair, they just aren't incompatible with
    Beam's, as Beam is currently implemented.

    Regardless, I'm now wondering what the best path forward is.
    Rabbit isn't unusable in Beam if the set of requirements and
    tradeoffs are well documented. That is, there are use cases that
    could be properly supported and some that likely can't.

    One option would be to use a pull-based api and immediately
    acknowledge each message as they arrive. This would effectively
    make the CheckpointMark a no-op, other than maintaining the
    watermark. In a pipeline that uses fixed windows (or non-session
    windowing) and uses a runner that supports 'Drain'-style semantics
    (like Dataflow) this should work just fine I think.

    Another would be to do a best-attempt at acknowledging as late as
    possible. This would be a hybrid approach where we attempt
    acknowledgements in the CheckpointMark, but use a special Coder
    that acknowledges all messages at the point the CheckpointMark is
    encoded. I think this feels a bit unsafe and overly complex, and
    I'm not sure it solves any real-world problems.

    I also feel like perhaps we should include Beam IO documentation
    that makes it clear that an unbounded source that requires a
    persistent connection for state tracking is not supportable by beam.

    Thanks,
    -Danny

    On 11/14/19 7:49 AM, Jan Lukavský wrote:
    Hi, as I said, I didn't dig too deep into that, but what I saw
    was [1].
    Generally, if RabbitMQ would have no way to recover subscription
    (which I don't think is the case), then it would not be
    incompatible with beam, but actually with would be incompatible
    any fault tolerant semantics.

    [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle

    Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert
    <daniel.rob...@acm.org> <mailto:daniel.rob...@acm.org>:


        On 11/14/19 2:32 AM, Jan Lukavský wrote:

            Hi Danny,

            as Eugene pointed out, there are essentially two "modes
            of operation" of CheckpointMark. It can:

             a) be used to somehow restore state of a reader (in call
            to UnboundedSource#createReader)

             b) confirm processed elements in
            CheckpointMark#finalizeCheckpoint

            If your source doesn't provide a persistent position in
            data stream that can be referred to (and serialized -
            example of this would be kafka offsets), then what you
            actually need to serialize is not the channel, but a way
            how to restore it - e.g. by opening a new channel with a
            given 'consumer group name'. Then you just use this
            checkpoint to commit your processed data in
            finalizeCheckpoint.

            Note that the finalizeCheckpoint is not guaranteed to be
            called - that can happen in cases when an error occurs
            and the source has to be rewind back - that is what
            direct runner emulates with the probability of
            'readerReuseChance'.

            I'm reading the documentation of RabbitMQ very quickly,
            but if I understand it correctly, then you have to create
            a subscription to the broker, serialize identifier of the
            subscription into the checkpointmark and then just
            recover the subscription in call to
            UnboundedSource#createReader. That should do the trick.

        I have not seen any such documentation in rabbit. My
        understanding is it has to be the same, physical connection
        and channel. Can you cite the source you were looking at?

        -Danny

            Hope this helps, sorry if I'm not using 100% correct
            RabbitMQ terminology as I said, I'm not quite familiar
            with it.

            Best,

             Jan

            On 11/14/19 5:26 AM, Daniel Robert wrote:

                I believe I've nailed down a situation that happens
                in practice that causes Beam and Rabbit to be
                incompatible. It seems that runners can and do make
                assumptions about the serializability (via Coder) of
                a CheckpointMark.

                To start, these are the semantics of RabbitMQ:

                - the client establishes a connection to the server
                - client opens a channel on the connection
                - messages are either pulled or pushed to the client
                from the server along this channel
                - when messages are done processing, they are
                acknowledged *client-side* and must be acknowledged
                on the *same channel* that originally received the
                message.

                Since a channel (or any open connection) is
                non-serializable, it means that a CheckpointMark that
                has been serialized cannot ever be used to
                acknowledge these messages and correctly 'finalize'
                the checkpoint. It also, as previously discussed in
                this thread, implies a rabbit Reader cannot accept an
                existing CheckpointMark at all; the Reader and the
                CheckpointMark must share the same connection to the
                rabbit server ("channel").

                Next, I've found how DirectRunner (and presumably
                others) can attempt to serialize a CheckpointMark
                that has not been finalized. In
                
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
                the DirectRunner applies a probability and if it
                hits, it sets the current reader to 'null' but
                retains the existing CheckpointMark, which it then
                attempts to pass to a new reader via a Coder.

                This puts the shard, the runner, and the reader with
                differing views of the world. In
                UnboundedReadEvaluatorFactory's processElement
                function, a call to getReader(shard) (
                
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
                ) clones the shard's checkpoint mark and passes that
                to the new reader. The reader ignores it, creating
                its own, but even if it accepted it, it would be
                accepting a serialized CheckpointMark, which wouldn't
                work. Later, the runner calls finishRead (
                
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
                ). The shard's CheckpointMark (unserialized; which
                should still be valid) is finalized. The reader's
                CheckpointMark (which may be a different instance)
                becomes the return value, which is referred to as
                "finishedCheckpoint" in the calling code, which is
                misleading at best and problematic at worst as *this*
                checkpoint has not been finalized.

                So, tl;dr: I cannot find any means of maintaining a
                persistent connection to the server for finalizing
                checkpoints that is safe across runners. If there's a
                guarantee all of the shards are on the same JVM
                instance, I could rely on global, static
                collections/instances as a workaround, but if other
                runners might serialize this across the wire, I'm
                stumped. The only workable situation I can think of
                right now is to proactively acknowledge messages as
                they are received and effectively no-op in
                finalizeCheckpoint. This is very different,
                semantically, and can lead to dropped messages if a
                pipeline doesn't finish processing the given message.

                Any help would be much appreciated.

                Thanks,
                -Danny

                On 11/7/19 10:27 PM, Eugene Kirpichov wrote:

                    Hi Daniel,

                    This is probably insufficiently well documented.
                    The CheckpointMark is used for two purposes:
                    1) To persistently store some notion of how much
                    of the stream has been consumed, so that if
                    something fails we can tell the underlying
                    streaming system where to start reading when we
                    re-create the reader. This is why CheckpointMark
                    is Serializable. E.g. this makes sense for Kafka.
                    2) To do acks - to let the underlying streaming
                    system know that the Beam pipeline will never
                    need data up to this CheckpointMark. Acking does
                    not require serializability - runners call ack()
                    on the same in-memory instance of CheckpointMark
                    that was produced by the reader. E.g. this makes
                    sense for RabbitMq or Pubsub.

                    In practice, these two capabilities tend to be
                    mutually exclusive: some streaming systems can
                    provide a serializable CheckpointMark, some can
                    do acks, some can do neither - but very few (or
                    none) can do both, and it's debatable whether it
                    even makes sense for a system to provide both
                    capabilities: usually acking is an implicit form
                    of streaming-system-side checkpointing, i.e. when
                    you re-create the reader you don't actually need
                    to carry over any information from an old
                    CheckpointMark - the necessary state (which
                    records should be delivered) is maintained on the
                    streaming system side.

                    These two are lumped together into one API simply
                    because that was the best design option we came
                    up with (not for lack of trying, but suggestions
                    very much welcome - AFAIK nobody is happy with it).

                    RabbitMQ is under #2 - it can't do serializable
                    checkpoint marks, but it can do acks. So you can
                    simply ignore the non-serializability.

                    On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
                    <daniel.rob...@acm.org
                    <mailto:daniel.rob...@acm.org>> wrote:

                        (Background: I recently upgraded RabbitMqIO
                        from the 4.x to 5.x library.
                        As part of this I switched to a pull-based
                        API rather than the
                        previously-used push-based. This has caused
                        some nebulous problems so
                        put up a correction PR that I think needs
                        some eyes fairly quickly as
                        I'd consider master to be broken for rabbitmq
                        right now. The PR keeps
                        the upgrade but reverts to the same
                        push-based implementation as in 4.x:
                        https://github.com/apache/beam/pull/9977 )

                        Regardless, in trying to get the pull-based
                        API to work, I'm finding the
                        interactions between rabbitmq and beam with
                        CheckpointMark to be
                        fundamentally impossible to implement so I'm
                        hoping for some input here.

                        CheckointMark itself must be Serializable,
                        presumably this means it gets
                        shuffled around between nodes. However
                        'Channel', the tunnel through
                        which it communicates with Rabbit to ack
                        messages and finalize the
                        checkpoint, is non-Serializable. Like most
                        other CheckpointMark
                        implementations, Channel is 'transient'. When
                        a new CheckpointMark is
                        instantiated, it's given a Channel. If an
                        existing one is supplied to
                        the Reader's constructor (part of the
                        'startReader()' interface), the
                        channel is overwritten.

                        *However*, Rabbit does not support 'ack'ing
                        messages on a channel other
                        than the one that consumed them in the first
                        place. Attempting to do so
                        results in a '406 (PRECONDITION-FAILED) -
                        unknown delivery tag'. (See
                        
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed

                        ).

                        Truthfully, I don't really understand how the
                        current implementation is
                        working; it seems like a happy accident. But
                        I'm curious if someone
                        could help me debug and implement how to
                        bridge the
                        re-usable/serializable CheckpointMark
                        requirement in Beam with this
                        limitation of Rabbit.

                        Thanks,
                        -Daniel Robert


Reply via email to