Thanks Euguene and Reuven.
In response to Eugene, I'd like to confirm I have this correct: In the
rabbit-style use case of "stream-system-side checkpointing", it is safe
(and arguably the correct behavior) to ignore the supplied
CheckpointMark argument in `createReader(options, checkpointmark)` and
in the constructor for the and instead always instantiate a new
CheckpointMark during construction. Is that correct?
In response to Reuven: noted, however I was mostly using serialization
in the general sense. That is, there does not seem to be any means of
deserializing a RabbitMqCheckpointMark such that it can continue to
provide value to a runner. Whether it's java serialization, avro, or any
other Coder, the 'channel' itself cannot "come along for the ride",
which leaves the rest of the internal state mostly unusable except for
perhaps some historical, immutable use case.
-Danny
On 11/8/19 2:01 AM, Reuven Lax wrote:
Just to clarify one thing: CheckpointMark does not need to be Java
Seralizable. All that's needed is do return a Coder for the
CheckpointMark in getCheckpointMarkCoder.
On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <j...@google.com
<mailto:j...@google.com>> wrote:
Hi Daniel,
This is probably insufficiently well documented. The
CheckpointMark is used for two purposes:
1) To persistently store some notion of how much of the stream has
been consumed, so that if something fails we can tell the
underlying streaming system where to start reading when we
re-create the reader. This is why CheckpointMark is Serializable.
E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming system know that
the Beam pipeline will never need data up to this CheckpointMark.
Acking does not require serializability - runners call ack() on
the same in-memory instance of CheckpointMark that was produced by
the reader. E.g. this makes sense for RabbitMq or Pubsub.
In practice, these two capabilities tend to be mutually exclusive:
some streaming systems can provide a serializable CheckpointMark,
some can do acks, some can do neither - but very few (or none) can
do both, and it's debatable whether it even makes sense for a
system to provide both capabilities: usually acking is an implicit
form of streaming-system-side checkpointing, i.e. when you
re-create the reader you don't actually need to carry over any
information from an old CheckpointMark - the necessary state
(which records should be delivered) is maintained on the streaming
system side.
These two are lumped together into one API simply because that was
the best design option we came up with (not for lack of trying,
but suggestions very much welcome - AFAIK nobody is happy with it).
RabbitMQ is under #2 - it can't do serializable checkpoint marks,
but it can do acks. So you can simply ignore the non-serializability.
On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
<daniel.rob...@acm.org <mailto:daniel.rob...@acm.org>> wrote:
(Background: I recently upgraded RabbitMqIO from the 4.x to
5.x library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous
problems so
put up a correction PR that I think needs some eyes fairly
quickly as
I'd consider master to be broken for rabbitmq right now. The
PR keeps
the upgrade but reverts to the same push-based implementation
as in 4.x:
https://github.com/apache/beam/pull/9977 )
Regardless, in trying to get the pull-based API to work, I'm
finding the
interactions between rabbitmq and beam with CheckpointMark to be
fundamentally impossible to implement so I'm hoping for some
input here.
CheckointMark itself must be Serializable, presumably this
means it gets
shuffled around between nodes. However 'Channel', the tunnel
through
which it communicates with Rabbit to ack messages and finalize
the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new
CheckpointMark is
instantiated, it's given a Channel. If an existing one is
supplied to
the Reader's constructor (part of the 'startReader()'
interface), the
channel is overwritten.
*However*, Rabbit does not support 'ack'ing messages on a
channel other
than the one that consumed them in the first place. Attempting
to do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery
tag'. (See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
).
Truthfully, I don't really understand how the current
implementation is
working; it seems like a happy accident. But I'm curious if
someone
could help me debug and implement how to bridge the
re-usable/serializable CheckpointMark requirement in Beam with
this
limitation of Rabbit.
Thanks,
-Daniel Robert