amaliujia commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r610070432



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -98,35 +117,53 @@ private boolean allDone() {
    *
    * <p>All {@link InboundDataClient}s will be failed if processing throws an 
exception.
    *
-   * <p>This method is NOT thread safe. This should only be invoked by a 
single thread, and is
-   * intended for use with a newly constructed QueueingBeamFnDataClient in 
{@link
-   * ProcessBundleHandler#processBundle}.
+   * <p>This method is NOT thread safe. This should only be invoked once by a 
single thread.
    */
   public void drainAndBlock() throws Exception {
+    // If all of the inbound clients have already added to the queue, we don't
+    // use the poison to stop and instead poll. This avoids a possible deadlock
+    // if the queue is full and we are unable to queue the poison.
+    boolean requirePoison;
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(!isDraining);
+      isDraining = true;
+      requirePoison = !inboundDataClients.isEmpty();

Review comment:
       question:
   
   addPosion above is set based on `inboundDataClients.isEmpty() && 
isDraining`, and why requirePosion does not require the same check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to