Thanks Euguene and Reuven.

In response to Eugene, I'd like to confirm I have this correct: In the rabbit-style use case of "stream-system-side checkpointing", it is safe (and arguably the correct behavior) to ignore the supplied CheckpointMark argument in `createReader(options, checkpointmark)` and in the constructor for the and instead always instantiate a new CheckpointMark during construction. Is that correct?

In response to Reuven: noted, however I was mostly using serialization in the general sense. That is, there does not seem to be any means of deserializing a RabbitMqCheckpointMark such that it can continue to provide value to a runner. Whether it's java serialization, avro, or any other Coder, the 'channel' itself cannot "come along for the ride", which leaves the rest of the internal state mostly unusable except for perhaps some historical, immutable use case.

-Danny

On 11/8/19 2:01 AM, Reuven Lax wrote:
Just to clarify one thing: CheckpointMark does not need to be Java Seralizable. All that's needed is do return a Coder for the CheckpointMark in getCheckpointMarkCoder.

On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <j...@google.com <mailto:j...@google.com>> wrote:

    Hi Daniel,

    This is probably insufficiently well documented. The
    CheckpointMark is used for two purposes:
    1) To persistently store some notion of how much of the stream has
    been consumed, so that if something fails we can tell the
    underlying streaming system where to start reading when we
    re-create the reader. This is why CheckpointMark is Serializable.
    E.g. this makes sense for Kafka.
    2) To do acks - to let the underlying streaming system know that
    the Beam pipeline will never need data up to this CheckpointMark.
    Acking does not require serializability - runners call ack() on
    the same in-memory instance of CheckpointMark that was produced by
    the reader. E.g. this makes sense for RabbitMq or Pubsub.

    In practice, these two capabilities tend to be mutually exclusive:
    some streaming systems can provide a serializable CheckpointMark,
    some can do acks, some can do neither - but very few (or none) can
    do both, and it's debatable whether it even makes sense for a
    system to provide both capabilities: usually acking is an implicit
    form of streaming-system-side checkpointing, i.e. when you
    re-create the reader you don't actually need to carry over any
    information from an old CheckpointMark - the necessary state
    (which records should be delivered) is maintained on the streaming
    system side.

    These two are lumped together into one API simply because that was
    the best design option we came up with (not for lack of trying,
    but suggestions very much welcome - AFAIK nobody is happy with it).

    RabbitMQ is under #2 - it can't do serializable checkpoint marks,
    but it can do acks. So you can simply ignore the non-serializability.

    On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
    <daniel.rob...@acm.org <mailto:daniel.rob...@acm.org>> wrote:

        (Background: I recently upgraded RabbitMqIO from the 4.x to
        5.x library.
        As part of this I switched to a pull-based API rather than the
        previously-used push-based. This has caused some nebulous
        problems so
        put up a correction PR that I think needs some eyes fairly
        quickly as
        I'd consider master to be broken for rabbitmq right now. The
        PR keeps
        the upgrade but reverts to the same push-based implementation
        as in 4.x:
        https://github.com/apache/beam/pull/9977 )

        Regardless, in trying to get the pull-based API to work, I'm
        finding the
        interactions between rabbitmq and beam with CheckpointMark to be
        fundamentally impossible to implement so I'm hoping for some
        input here.

        CheckointMark itself must be Serializable, presumably this
        means it gets
        shuffled around between nodes. However 'Channel', the tunnel
        through
        which it communicates with Rabbit to ack messages and finalize
        the
        checkpoint, is non-Serializable. Like most other CheckpointMark
        implementations, Channel is 'transient'. When a new
        CheckpointMark is
        instantiated, it's given a Channel. If an existing one is
        supplied to
        the Reader's constructor (part of the 'startReader()'
        interface), the
        channel is overwritten.

        *However*, Rabbit does not support 'ack'ing messages on a
        channel other
        than the one that consumed them in the first place. Attempting
        to do so
        results in a '406 (PRECONDITION-FAILED) - unknown delivery
        tag'. (See
        
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed

        ).

        Truthfully, I don't really understand how the current
        implementation is
        working; it seems like a happy accident. But I'm curious if
        someone
        could help me debug and implement how to bridge the
        re-usable/serializable CheckpointMark requirement in Beam with
        this
        limitation of Rabbit.

        Thanks,
        -Daniel Robert

Reply via email to