scwhittle commented on a change in pull request #14458:
URL: https://github.com/apache/beam/pull/14458#discussion_r610444222
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -141,6 +138,11 @@ public void drainAndBlock() throws Exception {
return this.mainClient.send(apiServiceDescriptor, outputLocation, coder);
}
+ /** Resets this object so that it may be reused. */
+ public void reset() {
+ inboundDataClients.clear();
Review comment:
I looked into that some more but it isn't easy since the queue reference
has made it's way into various places as part of the cached BundleProcessor. I
modified things so that the created QueueingFnDataReceiver are bound to an
explicit queue instead of the QueueingBeamFnDataClient itself. This means that
in reset we can just create a new queue and the identified race doesn't matter.
Clients that have been failed but maybe not yet done will add to the orphaned
queue (or if the queue is full they will abort once they notice they are
failed).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]