Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
Generally, if RabbitMQ would have no way to recover subscription
(which I don't think is the case), then it would not be incompatible
with beam, but actually with would be incompatible any fault tolerant
semantics.
[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert
<daniel.rob...@acm.org>:
On 11/14/19 2:32 AM, Jan Lukavský wrote:
Hi Danny,
as Eugene pointed out, there are essentially two "modes of
operation" of CheckpointMark. It can:
a) be used to somehow restore state of a reader (in call to
UnboundedSource#createReader)
b) confirm processed elements in
CheckpointMark#finalizeCheckpoint
If your source doesn't provide a persistent position in data
stream that can be referred to (and serialized - example of
this would be kafka offsets), then what you actually need to
serialize is not the channel, but a way how to restore it -
e.g. by opening a new channel with a given 'consumer group
name'. Then you just use this checkpoint to commit your
processed data in finalizeCheckpoint.
Note that the finalizeCheckpoint is not guaranteed to be
called - that can happen in cases when an error occurs and
the source has to be rewind back - that is what direct runner
emulates with the probability of 'readerReuseChance'.
I'm reading the documentation of RabbitMQ very quickly, but
if I understand it correctly, then you have to create a
subscription to the broker, serialize identifier of the
subscription into the checkpointmark and then just recover
the subscription in call to UnboundedSource#createReader.
That should do the trick.
I have not seen any such documentation in rabbit. My
understanding is it has to be the same, physical connection and
channel. Can you cite the source you were looking at?
-Danny
Hope this helps, sorry if I'm not using 100% correct RabbitMQ
terminology as I said, I'm not quite familiar with it.
Best,
Jan
On 11/14/19 5:26 AM, Daniel Robert wrote:
I believe I've nailed down a situation that happens in
practice that causes Beam and Rabbit to be incompatible.
It seems that runners can and do make assumptions about
the serializability (via Coder) of a CheckpointMark.
To start, these are the semantics of RabbitMQ:
- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from
the server along this channel
- when messages are done processing, they are
acknowledged *client-side* and must be acknowledged on
the *same channel* that originally received the message.
Since a channel (or any open connection) is
non-serializable, it means that a CheckpointMark that has
been serialized cannot ever be used to acknowledge these
messages and correctly 'finalize' the checkpoint. It
also, as previously discussed in this thread, implies a
rabbit Reader cannot accept an existing CheckpointMark at
all; the Reader and the CheckpointMark must share the
same connection to the rabbit server ("channel").
Next, I've found how DirectRunner (and presumably others)
can attempt to serialize a CheckpointMark that has not
been finalized. In
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it hits, it
sets the current reader to 'null' but retains the
existing CheckpointMark, which it then attempts to pass
to a new reader via a Coder.
This puts the shard, the runner, and the reader with
differing views of the world. In
UnboundedReadEvaluatorFactory's processElement function,
a call to getReader(shard) (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
) clones the shard's checkpoint mark and passes that to
the new reader. The reader ignores it, creating its own,
but even if it accepted it, it would be accepting a
serialized CheckpointMark, which wouldn't work. Later,
the runner calls finishRead (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
). The shard's CheckpointMark (unserialized; which should
still be valid) is finalized. The reader's CheckpointMark
(which may be a different instance) becomes the return
value, which is referred to as "finishedCheckpoint" in
the calling code, which is misleading at best and
problematic at worst as *this* checkpoint has not been
finalized.
So, tl;dr: I cannot find any means of maintaining a
persistent connection to the server for finalizing
checkpoints that is safe across runners. If there's a
guarantee all of the shards are on the same JVM instance,
I could rely on global, static collections/instances as a
workaround, but if other runners might serialize this
across the wire, I'm stumped. The only workable situation
I can think of right now is to proactively acknowledge
messages as they are received and effectively no-op in
finalizeCheckpoint. This is very different, semantically,
and can lead to dropped messages if a pipeline doesn't
finish processing the given message.
Any help would be much appreciated.
Thanks,
-Danny
On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
Hi Daniel,
This is probably insufficiently well documented. The
CheckpointMark is used for two purposes:
1) To persistently store some notion of how much of
the stream has been consumed, so that if something
fails we can tell the underlying streaming system
where to start reading when we re-create the reader.
This is why CheckpointMark is Serializable. E.g. this
makes sense for Kafka.
2) To do acks - to let the underlying streaming
system know that the Beam pipeline will never need
data up to this CheckpointMark. Acking does not
require serializability - runners call ack() on the
same in-memory instance of CheckpointMark that was
produced by the reader. E.g. this makes sense for
RabbitMq or Pubsub.
In practice, these two capabilities tend to be
mutually exclusive: some streaming systems can
provide a serializable CheckpointMark, some can do
acks, some can do neither - but very few (or none)
can do both, and it's debatable whether it even makes
sense for a system to provide both capabilities:
usually acking is an implicit form of
streaming-system-side checkpointing, i.e. when you
re-create the reader you don't actually need to carry
over any information from an old CheckpointMark - the
necessary state (which records should be delivered)
is maintained on the streaming system side.
These two are lumped together into one API simply
because that was the best design option we came up
with (not for lack of trying, but suggestions very
much welcome - AFAIK nobody is happy with it).
RabbitMQ is under #2 - it can't do serializable
checkpoint marks, but it can do acks. So you can
simply ignore the non-serializability.
On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
<daniel.rob...@acm.org
<mailto:daniel.rob...@acm.org>> wrote:
(Background: I recently upgraded RabbitMqIO from
the 4.x to 5.x library.
As part of this I switched to a pull-based API
rather than the
previously-used push-based. This has caused some
nebulous problems so
put up a correction PR that I think needs some
eyes fairly quickly as
I'd consider master to be broken for rabbitmq
right now. The PR keeps
the upgrade but reverts to the same push-based
implementation as in 4.x:
https://github.com/apache/beam/pull/9977 )
Regardless, in trying to get the pull-based API
to work, I'm finding the
interactions between rabbitmq and beam with
CheckpointMark to be
fundamentally impossible to implement so I'm
hoping for some input here.
CheckointMark itself must be Serializable,
presumably this means it gets
shuffled around between nodes. However 'Channel',
the tunnel through
which it communicates with Rabbit to ack messages
and finalize the
checkpoint, is non-Serializable. Like most other
CheckpointMark
implementations, Channel is 'transient'. When a
new CheckpointMark is
instantiated, it's given a Channel. If an
existing one is supplied to
the Reader's constructor (part of the
'startReader()' interface), the
channel is overwritten.
*However*, Rabbit does not support 'ack'ing
messages on a channel other
than the one that consumed them in the first
place. Attempting to do so
results in a '406 (PRECONDITION-FAILED) - unknown
delivery tag'. (See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
).
Truthfully, I don't really understand how the
current implementation is
working; it seems like a happy accident. But I'm
curious if someone
could help me debug and implement how to bridge the
re-usable/serializable CheckpointMark requirement
in Beam with this
limitation of Rabbit.
Thanks,
-Daniel Robert