Alex Amato created BEAM-6758:
--------------------------------
Summary: Potential Bug, BeamFnDataClient future finalization
Key: BEAM-6758
URL: https://issues.apache.org/jira/browse/BEAM-6758
Project: Beam
Issue Type: New Feature
Components: java-fn-execution
Reporter: Alex Amato
Just wanted to write down this thought before I forget, maybe a second person
can think through this and close it out if I am wrong.
I think that there is a bug here, which could leave to data loss when
processing the last few elements in a bundle
I recently learned that a java CompletableFuture cannot be
completedExecptionally (AKA failed) if the future has already completed. So in
QueueingBeamFnDataClient if the future is already marked done, we cannot fail
it. I noticed this behaviour in a unit test for a bundle receiving data for
multiple inputs (where I could not fail one of them execptionally, since it was
already marked done).
The potential bug I see would occur if the future is already completed before
we fail the element (I think that we rely on this in the allDone method of the
QueuingBeamFnDataClient). Imagine processing the last few elements in a bundle,
the InBoundDataClient is marked completed because there are no more elements on
the GRPC channel coming in, but we fail when processing it.
I could be wrong, if somehow the inbound data client futures are guaranteed to
not complete until we finish processing the elements themselves. But I don't
think this is the case, I think there is some code (GRPCBeamFnDataClient) which
will complete the future once it has received all the elements on the channel.
Also we might have other code which mitigates this problem entirely, because
the ProcessBundleHandler.processBundle will also throw an exception in this
case, which should be enough to fail the bundle and hopefully prevent data loss.
One potential solution is to have two future in the inboundDataClient:
- waitUntilAllElementsReceivedOnGrpc
- waitUntilAllElementsFinishedProcessing (which can be marked in the
QueueingBeamFnDataClient).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)