scwhittle commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r610474458



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -98,35 +117,53 @@ private boolean allDone() {
    *
    * <p>All {@link InboundDataClient}s will be failed if processing throws an 
exception.
    *
-   * <p>This method is NOT thread safe. This should only be invoked by a 
single thread, and is
-   * intended for use with a newly constructed QueueingBeamFnDataClient in 
{@link
-   * ProcessBundleHandler#processBundle}.
+   * <p>This method is NOT thread safe. This should only be invoked once by a 
single thread.
    */
   public void drainAndBlock() throws Exception {
+    // If all of the inbound clients have already added to the queue, we don't
+    // use the poison to stop and instead poll. This avoids a possible deadlock
+    // if the queue is full and we are unable to queue the poison.
+    boolean requirePoison;
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(!isDraining);
+      isDraining = true;
+      requirePoison = !inboundDataClients.isEmpty();

Review comment:
       there are two possibilities:
   drainAndBlock is called while there are still clients -> last client is 
responsible for adding poison
   drainAndBlock is called while there are no clients -> poison will not be 
added, drainAndBlock just drains existing queue
   
   I took a stab at improving the comments throughout

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -71,23 +85,28 @@ public InboundDataClient receive(
     InboundDataClient inboundDataClient =
         this.mainClient.receive(apiServiceDescriptor, inputLocation, 
queueingConsumer);
     queueingConsumer.inboundDataClient = inboundDataClient;
-    this.inboundDataClients.computeIfAbsent(
-        inboundDataClient, (InboundDataClient idcToStore) -> idcToStore);
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(!isDraining);
+      if (this.inboundDataClients.add(inboundDataClient)) {
+        inboundDataClient.runWhenComplete(() -> 
completeInbound(inboundDataClient));
+      }
+    }
     return inboundDataClient;
   }
 
-  // Returns true if all the InboundDataClients have finished or cancelled and 
no values
-  // remain on the queue.
-  private boolean allDone() {
-    for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) {
-      if (!inboundDataClient.isDone()) {
-        return false;
+  private void completeInbound(InboundDataClient client) {
+    Preconditions.checkState(client.isDone());
+    boolean addPoison = false;
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(inboundDataClients.remove(client));
+      finishedClients.add(client);
+      if (inboundDataClients.isEmpty() && isDraining) {
+        addPoison = true;

Review comment:
       yes if this was the last client and we are in drain and block the poison 
is used to stop drain and block.
   I improved comments throughout, PTAL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to