Hi Daniel,
On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <daniel.rob...@acm.org
<mailto:daniel.rob...@acm.org>> wrote:
I believe I've nailed down a situation that happens in practice
that causes Beam and Rabbit to be incompatible. It seems that
runners can and do make assumptions about the serializability
(via Coder) of a CheckpointMark.
To start, these are the semantics of RabbitMQ:
- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the
server along this channel
- when messages are done processing, they are acknowledged
*client-side* and must be acknowledged on the *same channel* that
originally received the message.
Since a channel (or any open connection) is non-serializable, it
means that a CheckpointMark that has been serialized cannot ever
be used to acknowledge these messages and correctly 'finalize'
the checkpoint. It also, as previously discussed in this thread,
implies a rabbit Reader cannot accept an existing CheckpointMark
at all; the Reader and the CheckpointMark must share the same
connection to the rabbit server ("channel").
This is correct.
Next, I've found how DirectRunner (and presumably others) can
attempt to serialize a CheckpointMark that has not been
finalized. In
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it hits, it sets
the current reader to 'null' but retains the existing
CheckpointMark, which it then attempts to pass to a new reader
via a Coder.
Correct, this simulates a failure scenario:
- Runner was reading the source and, after finalizing a bunch of
previous CheckpointMarks, obtained a new one and serialized it so
things can be restored in case of failure
- A failure happened before the current CheckpointMark could be
finalized, which means Beam was not able to guarantee that elements
after the last-finalized mark have been durably processed, so we may
need to re-read them, so runner recreates a reader from the current mark.
This puts the shard, the runner, and the reader with differing
views of the world. In UnboundedReadEvaluatorFactory's
processElement function, a call to getReader(shard) (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
) clones the shard's checkpoint mark and passes that to the new
reader. The reader ignores it, creating its own, but even if it
accepted it, it would be accepting a serialized CheckpointMark,
which wouldn't work.
Correct in the sense that for a RabbitMQ reader, a CheckpointMark
doesn't affect what the reader will read: it depends only on the
broker's internal state (which in turn depends on which messages have
been acked by previous finalized CheckpointMark's).
Later, the runner calls finishRead (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
). The shard's CheckpointMark (unserialized; which should still
be valid) is finalized. The reader's CheckpointMark (which may be
a different instance) becomes the return value, which is referred
to as "finishedCheckpoint" in the calling code, which is
misleading at best and problematic at worst as *this* checkpoint
has not been finalized.
I'm not following what is the problem here. In that code, "oldMark"
is the last checkpoint mark to be finalized - calling
finalizeCheckpoint on it signals that Beam has durably processed all
the messages read from the reader until that mark. "mark" (the new
one) represents the state of the reader after the last finalized
mark, so it should not be finalized.
I.e. AFAIR in a hypothetical runner (which DirectRunner tries to
emulate) things go like this:
Create a reader
Let mark M1 = reader.getCheckpointMark()
Durably persist M1 as the "restore point" of this reader
...read messages A B C from reader and durably process them...
Finalize M1 (acks A B C)
Let mark M2 = reader.getCheckpointMark()
Durably persist M2 as the "restore point" of this reader
...read messages D E F and durably process them...
Finalize M2 (acks D E F)
Now let's imagine a failure.
Durably persist M2 as the "restore point" of this reader
...read messages D E, and then a failure happens
Recreate reader from M2 (reader ignores M2 but it doesn't matter)
Since M2 was not finalized, messages D E F were not acked, and
RabbitMQ will redeliver them to this reader. D E will be processed
twice, but only the results of this new processing will be durable.
Finalize M2 (acks D E F)
Etc.
Basically you can think of this as a series of micro-bundles, where
micro-bundles are delimited by checkpoint marks, and each
micro-bundle is a runner-side transaction which either commits or
discards the results of processing all messages in this micro-bundle.
After a micro-bundle [M1, M2) commits, the runner calls
M1.finalizeCheckpointMark() and persists M2 as the new restore point
in case of failure.
So, tl;dr: I cannot find any means of maintaining a persistent
connection to the server for finalizing checkpoints that is safe
across runners. If there's a guarantee all of the shards are on
the same JVM instance, I could rely on global, static
collections/instances as a workaround, but if other runners might
serialize this across the wire, I'm stumped. The only workable
situation I can think of right now is to proactively acknowledge
messages as they are received and effectively no-op in
finalizeCheckpoint. This is very different, semantically, and can
lead to dropped messages if a pipeline doesn't finish processing
the given message.
Any help would be much appreciated.
If I'm misunderstanding something above, could you describe in detail
a scenario that leads to message loss, or (less severe) to
more-than-once durable processing of the same message?
Thanks,
-Danny
On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
Hi Daniel,
This is probably insufficiently well documented. The
CheckpointMark is used for two purposes:
1) To persistently store some notion of how much of the stream
has been consumed, so that if something fails we can tell the
underlying streaming system where to start reading when we
re-create the reader. This is why CheckpointMark is
Serializable. E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming system know that
the Beam pipeline will never need data up to this
CheckpointMark. Acking does not require serializability -
runners call ack() on the same in-memory instance of
CheckpointMark that was produced by the reader. E.g. this makes
sense for RabbitMq or Pubsub.
In practice, these two capabilities tend to be mutually
exclusive: some streaming systems can provide a serializable
CheckpointMark, some can do acks, some can do neither - but very
few (or none) can do both, and it's debatable whether it even
makes sense for a system to provide both capabilities: usually
acking is an implicit form of streaming-system-side
checkpointing, i.e. when you re-create the reader you don't
actually need to carry over any information from an old
CheckpointMark - the necessary state (which records should be
delivered) is maintained on the streaming system side.
These two are lumped together into one API simply because that
was the best design option we came up with (not for lack of
trying, but suggestions very much welcome - AFAIK nobody is
happy with it).
RabbitMQ is under #2 - it can't do serializable checkpoint
marks, but it can do acks. So you can simply ignore the
non-serializability.
On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
<daniel.rob...@acm.org <mailto:daniel.rob...@acm.org>> wrote:
(Background: I recently upgraded RabbitMqIO from the 4.x to
5.x library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous
problems so
put up a correction PR that I think needs some eyes fairly
quickly as
I'd consider master to be broken for rabbitmq right now. The
PR keeps
the upgrade but reverts to the same push-based
implementation as in 4.x:
https://github.com/apache/beam/pull/9977 )
Regardless, in trying to get the pull-based API to work, I'm
finding the
interactions between rabbitmq and beam with CheckpointMark
to be
fundamentally impossible to implement so I'm hoping for some
input here.
CheckointMark itself must be Serializable, presumably this
means it gets
shuffled around between nodes. However 'Channel', the tunnel
through
which it communicates with Rabbit to ack messages and
finalize the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new
CheckpointMark is
instantiated, it's given a Channel. If an existing one is
supplied to
the Reader's constructor (part of the 'startReader()'
interface), the
channel is overwritten.
*However*, Rabbit does not support 'ack'ing messages on a
channel other
than the one that consumed them in the first place.
Attempting to do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery
tag'. (See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
).
Truthfully, I don't really understand how the current
implementation is
working; it seems like a happy accident. But I'm curious if
someone
could help me debug and implement how to bridge the
re-usable/serializable CheckpointMark requirement in Beam
with this
limitation of Rabbit.
Thanks,
-Daniel Robert