amaliujia commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r610067279



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -71,23 +85,28 @@ public InboundDataClient receive(
     InboundDataClient inboundDataClient =
         this.mainClient.receive(apiServiceDescriptor, inputLocation, 
queueingConsumer);
     queueingConsumer.inboundDataClient = inboundDataClient;
-    this.inboundDataClients.computeIfAbsent(
-        inboundDataClient, (InboundDataClient idcToStore) -> idcToStore);
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(!isDraining);
+      if (this.inboundDataClients.add(inboundDataClient)) {
+        inboundDataClient.runWhenComplete(() -> 
completeInbound(inboundDataClient));
+      }
+    }
     return inboundDataClient;
   }
 
-  // Returns true if all the InboundDataClients have finished or cancelled and 
no values
-  // remain on the queue.
-  private boolean allDone() {
-    for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) {
-      if (!inboundDataClient.isDone()) {
-        return false;
+  private void completeInbound(InboundDataClient client) {
+    Preconditions.checkState(client.isDone());
+    boolean addPoison = false;
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(inboundDataClients.remove(client));
+      finishedClients.add(client);
+      if (inboundDataClients.isEmpty() && isDraining) {
+        addPoison = true;

Review comment:
       I found a hard to understand why adding this boolean flag? To indicate 
this is draining?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to