amaliujia commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r610068648



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -71,23 +85,28 @@ public InboundDataClient receive(
     InboundDataClient inboundDataClient =
         this.mainClient.receive(apiServiceDescriptor, inputLocation, 
queueingConsumer);
     queueingConsumer.inboundDataClient = inboundDataClient;
-    this.inboundDataClients.computeIfAbsent(
-        inboundDataClient, (InboundDataClient idcToStore) -> idcToStore);
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(!isDraining);
+      if (this.inboundDataClients.add(inboundDataClient)) {
+        inboundDataClient.runWhenComplete(() -> 
completeInbound(inboundDataClient));
+      }
+    }
     return inboundDataClient;
   }
 
-  // Returns true if all the InboundDataClients have finished or cancelled and 
no values
-  // remain on the queue.
-  private boolean allDone() {
-    for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) {
-      if (!inboundDataClient.isDone()) {
-        return false;
+  private void completeInbound(InboundDataClient client) {
+    Preconditions.checkState(client.isDone());
+    boolean addPoison = false;
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(inboundDataClients.remove(client));
+      finishedClients.add(client);
+      if (inboundDataClients.isEmpty() && isDraining) {
+        addPoison = true;

Review comment:
       Does the POISON indicate when to stop the loop in the `drainAndBlock`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to