kennknowles opened a new issue, #19257:
URL: https://github.com/apache/beam/issues/19257

   Just wanted to write down this thought before I forget, maybe a second 
person can think through this and close it out if I am wrong.
   
   I think that there is a bug here, which could leave to data loss when 
processing the last few elements in a bundle
   
    
   
   I recently learned that a java CompletableFuture cannot be 
completedExecptionally (AKA failed) if the future has already completed. So in 
QueueingBeamFnDataClient if the future is already marked done, we cannot fail 
it. I noticed this behaviour in a unit test for a bundle receiving data for 
multiple inputs (where I could not fail one of them execptionally, since it was 
already marked done). 
   
    
   
   The potential bug I see would occur if the future is already completed 
before we fail the element (I think that we rely on this in the allDone method 
of the QueuingBeamFnDataClient). Imagine processing the last few elements in a 
bundle, the InBoundDataClient is marked completed because there are no more 
elements on the GRPC channel coming in, but we fail when processing it.
   
    
   
   I could be wrong, if somehow the inbound data client futures are guaranteed 
to not complete until we finish processing the elements themselves. But I don't 
think this is the case, I think there is some code (GRPCBeamFnDataClient) which 
will complete the future once it has received all the elements on the channel.
   
   Also we might have other code which mitigates this problem entirely, because 
the ProcessBundleHandler.processBundle will also throw an exception in this 
case, which should be enough to fail the bundle and hopefully prevent data loss.
   
   One potential solution is to have two future in the inboundDataClient:
   - waitUntilAllElementsReceivedOnGrpc
   
   - waitUntilAllElementsFinishedProcessing (which can be marked in the 
QueueingBeamFnDataClient).
   
    
   
    
   
   Imported from Jira 
[BEAM-6758](https://issues.apache.org/jira/browse/BEAM-6758). Original Jira may 
contain additional context.
   Reported by: [email protected].


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to