amaliujia commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r610826519



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -98,35 +134,60 @@ private boolean allDone() {
    *
    * <p>All {@link InboundDataClient}s will be failed if processing throws an 
exception.
    *
-   * <p>This method is NOT thread safe. This should only be invoked by a 
single thread, and is
-   * intended for use with a newly constructed QueueingBeamFnDataClient in 
{@link
-   * ProcessBundleHandler#processBundle}.
+   * <p>This method is NOT thread safe. This should only be invoked once by a 
single thread. See
+   * class comment.
    */
   public void drainAndBlock() throws Exception {
+    // There are several ways drainAndBlock completes:
+    // - processing elements fails -> all inbound clients are failed and 
exception thrown
+    // - draining starts while inbound clients are active -> the last client 
will poision the queue
+    //   to notify that no more elements will arrive
+    // - draining starts without any remaining clients -> we just need to 
drain the queue and then
+    //   are done as no further elements will arrive.
+    boolean requirePoison;
+    synchronized (inboundDataClients) {
+      Preconditions.checkState(!isDraining);
+      isDraining = true;
+      // An alternative would be to add poison here if there were no remaining 
clients. However this
+      // could deadlock if the queue was full since this thread is responsible 
for consuming it.
+      requirePoison = !inboundDataClients.isEmpty();
+    }
     while (true) {
       try {
-        ConsumerAndData tuple = queue.poll(200, TimeUnit.MILLISECONDS);
-        if (tuple != null) {
-          // Forward to the consumers who cares about this data.
-          tuple.consumer.accept(tuple.data);
+        ConsumerAndData<?> tuple;
+        if (requirePoison) {
+          tuple = queue.take();
+          if (tuple == POISON) {
+            break;
+          }

Review comment:
       Is there a need to check `null` here? (though maybe `null` will be hit 
before enqueuing).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to