scwhittle commented on a change in pull request #14458:
URL: https://github.com/apache/beam/pull/14458#discussion_r610444222



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -141,6 +138,11 @@ public void drainAndBlock() throws Exception {
     return this.mainClient.send(apiServiceDescriptor, outputLocation, coder);
   }
 
+  /** Resets this object so that it may be reused. */
+  public void reset() {
+    inboundDataClients.clear();

Review comment:
       I looked into that some more but it isn't easy since the queue reference 
has made it's way into various places as part of the cached BundleProcessor.  I 
modified things so that the created QueueingFnDataReceiver are bound to an 
explicit queue instead of the QueueingBeamFnDataClient itself.  This means that 
in reset we can just create a new queue and the identified race doesn't matter. 
 Clients that have been failed but maybe not yet done will add to the orphaned 
queue (or if the queue is full they will abort once they notice they are 
failed).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to