Hi, as I said, I didn't dig too deep into that, but what I saw
was [1].
Generally, if RabbitMQ would have no way to recover subscription
(which I don't think is the case), then it would not be
incompatible with beam, but actually with would be incompatible
any fault tolerant semantics.
[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert
<daniel.rob...@acm.org> <mailto:daniel.rob...@acm.org>:
On 11/14/19 2:32 AM, Jan Lukavský wrote:
Hi Danny,
as Eugene pointed out, there are essentially two "modes
of operation" of CheckpointMark. It can:
a) be used to somehow restore state of a reader (in call
to UnboundedSource#createReader)
b) confirm processed elements in
CheckpointMark#finalizeCheckpoint
If your source doesn't provide a persistent position in
data stream that can be referred to (and serialized -
example of this would be kafka offsets), then what you
actually need to serialize is not the channel, but a way
how to restore it - e.g. by opening a new channel with a
given 'consumer group name'. Then you just use this
checkpoint to commit your processed data in
finalizeCheckpoint.
Note that the finalizeCheckpoint is not guaranteed to be
called - that can happen in cases when an error occurs
and the source has to be rewind back - that is what
direct runner emulates with the probability of
'readerReuseChance'.
I'm reading the documentation of RabbitMQ very quickly,
but if I understand it correctly, then you have to create
a subscription to the broker, serialize identifier of the
subscription into the checkpointmark and then just
recover the subscription in call to
UnboundedSource#createReader. That should do the trick.
I have not seen any such documentation in rabbit. My
understanding is it has to be the same, physical connection
and channel. Can you cite the source you were looking at?
-Danny
Hope this helps, sorry if I'm not using 100% correct
RabbitMQ terminology as I said, I'm not quite familiar
with it.
Best,
Jan
On 11/14/19 5:26 AM, Daniel Robert wrote:
I believe I've nailed down a situation that happens
in practice that causes Beam and Rabbit to be
incompatible. It seems that runners can and do make
assumptions about the serializability (via Coder) of
a CheckpointMark.
To start, these are the semantics of RabbitMQ:
- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client
from the server along this channel
- when messages are done processing, they are
acknowledged *client-side* and must be acknowledged
on the *same channel* that originally received the
message.
Since a channel (or any open connection) is
non-serializable, it means that a CheckpointMark that
has been serialized cannot ever be used to
acknowledge these messages and correctly 'finalize'
the checkpoint. It also, as previously discussed in
this thread, implies a rabbit Reader cannot accept an
existing CheckpointMark at all; the Reader and the
CheckpointMark must share the same connection to the
rabbit server ("channel").
Next, I've found how DirectRunner (and presumably
others) can attempt to serialize a CheckpointMark
that has not been finalized. In
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it
hits, it sets the current reader to 'null' but
retains the existing CheckpointMark, which it then
attempts to pass to a new reader via a Coder.
This puts the shard, the runner, and the reader with
differing views of the world. In
UnboundedReadEvaluatorFactory's processElement
function, a call to getReader(shard) (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
) clones the shard's checkpoint mark and passes that
to the new reader. The reader ignores it, creating
its own, but even if it accepted it, it would be
accepting a serialized CheckpointMark, which wouldn't
work. Later, the runner calls finishRead (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
). The shard's CheckpointMark (unserialized; which
should still be valid) is finalized. The reader's
CheckpointMark (which may be a different instance)
becomes the return value, which is referred to as
"finishedCheckpoint" in the calling code, which is
misleading at best and problematic at worst as *this*
checkpoint has not been finalized.
So, tl;dr: I cannot find any means of maintaining a
persistent connection to the server for finalizing
checkpoints that is safe across runners. If there's a
guarantee all of the shards are on the same JVM
instance, I could rely on global, static
collections/instances as a workaround, but if other
runners might serialize this across the wire, I'm
stumped. The only workable situation I can think of
right now is to proactively acknowledge messages as
they are received and effectively no-op in
finalizeCheckpoint. This is very different,
semantically, and can lead to dropped messages if a
pipeline doesn't finish processing the given message.
Any help would be much appreciated.
Thanks,
-Danny
On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
Hi Daniel,
This is probably insufficiently well documented.
The CheckpointMark is used for two purposes:
1) To persistently store some notion of how much
of the stream has been consumed, so that if
something fails we can tell the underlying
streaming system where to start reading when we
re-create the reader. This is why CheckpointMark
is Serializable. E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming
system know that the Beam pipeline will never
need data up to this CheckpointMark. Acking does
not require serializability - runners call ack()
on the same in-memory instance of CheckpointMark
that was produced by the reader. E.g. this makes
sense for RabbitMq or Pubsub.
In practice, these two capabilities tend to be
mutually exclusive: some streaming systems can
provide a serializable CheckpointMark, some can
do acks, some can do neither - but very few (or
none) can do both, and it's debatable whether it
even makes sense for a system to provide both
capabilities: usually acking is an implicit form
of streaming-system-side checkpointing, i.e. when
you re-create the reader you don't actually need
to carry over any information from an old
CheckpointMark - the necessary state (which
records should be delivered) is maintained on the
streaming system side.
These two are lumped together into one API simply
because that was the best design option we came
up with (not for lack of trying, but suggestions
very much welcome - AFAIK nobody is happy with it).
RabbitMQ is under #2 - it can't do serializable
checkpoint marks, but it can do acks. So you can
simply ignore the non-serializability.
On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
<daniel.rob...@acm.org
<mailto:daniel.rob...@acm.org>> wrote:
(Background: I recently upgraded RabbitMqIO
from the 4.x to 5.x library.
As part of this I switched to a pull-based
API rather than the
previously-used push-based. This has caused
some nebulous problems so
put up a correction PR that I think needs
some eyes fairly quickly as
I'd consider master to be broken for rabbitmq
right now. The PR keeps
the upgrade but reverts to the same
push-based implementation as in 4.x:
https://github.com/apache/beam/pull/9977 )
Regardless, in trying to get the pull-based
API to work, I'm finding the
interactions between rabbitmq and beam with
CheckpointMark to be
fundamentally impossible to implement so I'm
hoping for some input here.
CheckointMark itself must be Serializable,
presumably this means it gets
shuffled around between nodes. However
'Channel', the tunnel through
which it communicates with Rabbit to ack
messages and finalize the
checkpoint, is non-Serializable. Like most
other CheckpointMark
implementations, Channel is 'transient'. When
a new CheckpointMark is
instantiated, it's given a Channel. If an
existing one is supplied to
the Reader's constructor (part of the
'startReader()' interface), the
channel is overwritten.
*However*, Rabbit does not support 'ack'ing
messages on a channel other
than the one that consumed them in the first
place. Attempting to do so
results in a '406 (PRECONDITION-FAILED) -
unknown delivery tag'. (See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
).
Truthfully, I don't really understand how the
current implementation is
working; it seems like a happy accident. But
I'm curious if someone
could help me debug and implement how to
bridge the
re-usable/serializable CheckpointMark
requirement in Beam with this
limitation of Rabbit.
Thanks,
-Daniel Robert