Hi,

answers inline.

On 11/14/19 4:15 PM, Daniel Robert wrote:

We may be talking past each other a bit, though I do appreciate the responses.

Rabbit behaves a lot like a relational database in terms of state required. A connection is analogous to a database connection, and a channel (poor analogy here) is similar to an open transaction. If the connection is severed, the transaction will not be able to be committed.

In direct response to the consumer lifecycle linked to, yes, one can recover and re-establish connections, but any state maintained within the previous channel are lost. If there were messages that had not been acknowledged, they would have been re-delivered to some other consumer as they were never acknowledged.

Yes, that is exactly what basically all streaming sources that commit one message at a time (e.g. google pubsub, mqtt, ...) do. That is no problem for Beam, because you have to take into account two things:

 a) if a checkpoint is taken, it is taken in a way that ensures exactly-once processing in downstream operators (that is actually runner dependent, but all major runners behave like that)

 b) some sources might redeliver messages even in between of checkpoints (for instance due to timeout of message confirm) - such sources have in common that they use commit schemes of one message at a time (like rabbit, mqtt, or pubsub). This manifests by the need to override the default implementation of CheckpointMark#finalizeCheckpoint, which in javadoc [1] states that: "Returns whether this source requires explicit deduping. This is needed if the underlying data source can return the same record multiple times, such a queuing system with a pull-ack model. Sources where the records read are uniquely identified by the persisted state in the CheckpointMark do not need this."

That is probably exactly what is your case.

[1] https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/UnboundedSource.html#requiresDeduping--

"Subscription" isn't really the model in rabbit. It has advantages and disadvantages when compared with kafka -- mostly out of scope here -- but some quick advantages of the rabbit model: 1) it parallelizes "infinitely" without any changes to server (no re-partitioning or the like); 2) messages can be acknowledge in a separate order than they were consumed; 3) because state is managed associated with an active connection, at-least-once delivery semantics are easy to implement as any disconnection will result in the messages being re-placed in the queue and delivered to a new consumer. To say it's "incompatible with any fault tolerant semantics" is unfair, they just aren't incompatible with Beam's, as Beam is currently implemented.

What I mean by that is "if rabbit would not be able to recover 'subscription' in at least at-least-once fashion, then it is at best at-most-once and thus not fault tolerant. I was pretty sure that it is not the case.

Regardless, I'm now wondering what the best path forward is. Rabbit isn't unusable in Beam if the set of requirements and tradeoffs are well documented. That is, there are use cases that could be properly supported and some that likely can't.

I would really say that rabbit can be fully supported by Beam. Maybe the best analogy would be PubSubIO.

One option would be to use a pull-based api and immediately acknowledge each message as they arrive. This would effectively make the CheckpointMark a no-op, other than maintaining the watermark. In a pipeline that uses fixed windows (or non-session windowing) and uses a runner that supports 'Drain'-style semantics (like Dataflow) this should work just fine I think.

That would make the source not fault tolerant, because messages could not be redelivered.

Another would be to do a best-attempt at acknowledging as late as possible. This would be a hybrid approach where we attempt acknowledgements in the CheckpointMark, but use a special Coder that acknowledges all messages at the point the CheckpointMark is encoded. I think this feels a bit unsafe and overly complex, and I'm not sure it solves any real-world problems.

I also feel like perhaps we should include Beam IO documentation that makes it clear that an unbounded source that requires a persistent connection for state tracking is not supportable by beam.

Thanks,
-Danny

On 11/14/19 7:49 AM, Jan Lukavský wrote:
Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
Generally, if RabbitMQ would have no way to recover subscription (which I don't think is the case), then it would not be incompatible with beam, but actually with would be incompatible any fault tolerant semantics.

[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle

Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert <daniel.rob...@acm.org>:


    On 11/14/19 2:32 AM, Jan Lukavský wrote:

        Hi Danny,

        as Eugene pointed out, there are essentially two "modes of
        operation" of CheckpointMark. It can:

         a) be used to somehow restore state of a reader (in call to
        UnboundedSource#createReader)

         b) confirm processed elements in
        CheckpointMark#finalizeCheckpoint

        If your source doesn't provide a persistent position in data
        stream that can be referred to (and serialized - example of
        this would be kafka offsets), then what you actually need to
        serialize is not the channel, but a way how to restore it -
        e.g. by opening a new channel with a given 'consumer group
        name'. Then you just use this checkpoint to commit your
        processed data in finalizeCheckpoint.

        Note that the finalizeCheckpoint is not guaranteed to be
        called - that can happen in cases when an error occurs and
        the source has to be rewind back - that is what direct runner
        emulates with the probability of 'readerReuseChance'.

        I'm reading the documentation of RabbitMQ very quickly, but
        if I understand it correctly, then you have to create a
        subscription to the broker, serialize identifier of the
        subscription into the checkpointmark and then just recover
        the subscription in call to UnboundedSource#createReader.
        That should do the trick.

    I have not seen any such documentation in rabbit. My
    understanding is it has to be the same, physical connection and
    channel. Can you cite the source you were looking at?

    -Danny

        Hope this helps, sorry if I'm not using 100% correct RabbitMQ
        terminology as I said, I'm not quite familiar with it.

        Best,

         Jan

        On 11/14/19 5:26 AM, Daniel Robert wrote:

            I believe I've nailed down a situation that happens in
            practice that causes Beam and Rabbit to be incompatible.
            It seems that runners can and do make assumptions about
            the serializability (via Coder) of a CheckpointMark.

            To start, these are the semantics of RabbitMQ:

            - the client establishes a connection to the server
            - client opens a channel on the connection
            - messages are either pulled or pushed to the client from
            the server along this channel
            - when messages are done processing, they are
            acknowledged *client-side* and must be acknowledged on
            the *same channel* that originally received the message.

            Since a channel (or any open connection) is
            non-serializable, it means that a CheckpointMark that has
            been serialized cannot ever be used to acknowledge these
            messages and correctly 'finalize' the checkpoint. It
            also, as previously discussed in this thread, implies a
            rabbit Reader cannot accept an existing CheckpointMark at
            all; the Reader and the CheckpointMark must share the
            same connection to the rabbit server ("channel").

            Next, I've found how DirectRunner (and presumably others)
            can attempt to serialize a CheckpointMark that has not
            been finalized. In
            
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
            the DirectRunner applies a probability and if it hits, it
            sets the current reader to 'null' but retains the
            existing CheckpointMark, which it then attempts to pass
            to a new reader via a Coder.

            This puts the shard, the runner, and the reader with
            differing views of the world. In
            UnboundedReadEvaluatorFactory's processElement function,
            a call to getReader(shard) (
            
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
            ) clones the shard's checkpoint mark and passes that to
            the new reader. The reader ignores it, creating its own,
            but even if it accepted it, it would be accepting a
            serialized CheckpointMark, which wouldn't work. Later,
            the runner calls finishRead (
            
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
            ). The shard's CheckpointMark (unserialized; which should
            still be valid) is finalized. The reader's CheckpointMark
            (which may be a different instance) becomes the return
            value, which is referred to as "finishedCheckpoint" in
            the calling code, which is misleading at best and
            problematic at worst as *this* checkpoint has not been
            finalized.

            So, tl;dr: I cannot find any means of maintaining a
            persistent connection to the server for finalizing
            checkpoints that is safe across runners. If there's a
            guarantee all of the shards are on the same JVM instance,
            I could rely on global, static collections/instances as a
            workaround, but if other runners might serialize this
            across the wire, I'm stumped. The only workable situation
            I can think of right now is to proactively acknowledge
            messages as they are received and effectively no-op in
            finalizeCheckpoint. This is very different, semantically,
            and can lead to dropped messages if a pipeline doesn't
            finish processing the given message.

            Any help would be much appreciated.

            Thanks,
            -Danny

            On 11/7/19 10:27 PM, Eugene Kirpichov wrote:

                Hi Daniel,

                This is probably insufficiently well documented. The
                CheckpointMark is used for two purposes:
                1) To persistently store some notion of how much of
                the stream has been consumed, so that if something
                fails we can tell the underlying streaming system
                where to start reading when we re-create the reader.
                This is why CheckpointMark is Serializable. E.g. this
                makes sense for Kafka.
                2) To do acks - to let the underlying streaming
                system know that the Beam pipeline will never need
                data up to this CheckpointMark. Acking does not
                require serializability - runners call ack() on the
                same in-memory instance of CheckpointMark that was
                produced by the reader. E.g. this makes sense for
                RabbitMq or Pubsub.

                In practice, these two capabilities tend to be
                mutually exclusive: some streaming systems can
                provide a serializable CheckpointMark, some can do
                acks, some can do neither - but very few (or none)
                can do both, and it's debatable whether it even makes
                sense for a system to provide both capabilities:
                usually acking is an implicit form of
                streaming-system-side checkpointing, i.e. when you
                re-create the reader you don't actually need to carry
                over any information from an old CheckpointMark - the
                necessary state (which records should be delivered)
                is maintained on the streaming system side.

                These two are lumped together into one API simply
                because that was the best design option we came up
                with (not for lack of trying, but suggestions very
                much welcome - AFAIK nobody is happy with it).

                RabbitMQ is under #2 - it can't do serializable
                checkpoint marks, but it can do acks. So you can
                simply ignore the non-serializability.

                On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
                <daniel.rob...@acm.org
                <mailto:daniel.rob...@acm.org>> wrote:

                    (Background: I recently upgraded RabbitMqIO from
                    the 4.x to 5.x library.
                    As part of this I switched to a pull-based API
                    rather than the
                    previously-used push-based. This has caused some
                    nebulous problems so
                    put up a correction PR that I think needs some
                    eyes fairly quickly as
                    I'd consider master to be broken for rabbitmq
                    right now. The PR keeps
                    the upgrade but reverts to the same push-based
                    implementation as in 4.x:
                    https://github.com/apache/beam/pull/9977 )

                    Regardless, in trying to get the pull-based API
                    to work, I'm finding the
                    interactions between rabbitmq and beam with
                    CheckpointMark to be
                    fundamentally impossible to implement so I'm
                    hoping for some input here.

                    CheckointMark itself must be Serializable,
                    presumably this means it gets
                    shuffled around between nodes. However 'Channel',
                    the tunnel through
                    which it communicates with Rabbit to ack messages
                    and finalize the
                    checkpoint, is non-Serializable. Like most other
                    CheckpointMark
                    implementations, Channel is 'transient'. When a
                    new CheckpointMark is
                    instantiated, it's given a Channel. If an
                    existing one is supplied to
                    the Reader's constructor (part of the
                    'startReader()' interface), the
                    channel is overwritten.

                    *However*, Rabbit does not support 'ack'ing
                    messages on a channel other
                    than the one that consumed them in the first
                    place. Attempting to do so
                    results in a '406 (PRECONDITION-FAILED) - unknown
                    delivery tag'. (See
                    
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed

                    ).

                    Truthfully, I don't really understand how the
                    current implementation is
                    working; it seems like a happy accident. But I'm
                    curious if someone
                    could help me debug and implement how to bridge the
                    re-usable/serializable CheckpointMark requirement
                    in Beam with this
                    limitation of Rabbit.

                    Thanks,
                    -Daniel Robert


Reply via email to