This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.30.0 by this push:
     new efa150f  [BEAM-12118] Fix race introduced in QueeuingBeamFnDataClient 
triggering failed precondition. (#14668)
     new ae3525b  Merge pull request #14728 from ihji/cherry-pick-12118
efa150f is described below

commit efa150f3b971b513b82b50830a417f2b9adaa151
Author: scwhittle <[email protected]>
AuthorDate: Thu Apr 29 09:30:08 2021 -0700

    [BEAM-12118] Fix race introduced in QueeuingBeamFnDataClient triggering 
failed precondition. (#14668)
---
 .../beam/fn/harness/data/QueueingBeamFnDataClient.java     | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
index 21d0b14..11dea66 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
@@ -83,12 +83,22 @@ public class QueueingBeamFnDataClient implements 
BeamFnDataClient {
     // empty in which case it returns null.
     @Nullable
     ConsumerAndData<?> take() throws InterruptedException {
+      // We first poll without blocking to optimize for the case there is data.
+      // If there is no data we end up blocking on take() and thus the extra
+      // poll doesn't matter.
       @Nullable ConsumerAndData<?> result = queue.poll();
       if (result == null) {
         if (closed.get()) {
-          return null;
+          // Poll again to ensure that there is nothing in the queue. Once we 
observe closed as true
+          // we are guaranteed no additional elements other than the POISON 
will be added. However
+          // we can't rely on the previous poll result as it could race with 
additional offers and
+          // close.
+          result = queue.poll();
+        } else {
+          // We are not closed so we perform a blocking take. We are 
guaranteed that additional
+          // elements will be offered or the POISON will be added by close to 
unblock this thread.
+          result = queue.take();
         }
-        result = queue.take();
       }
       if (result == POISON) {
         return null;

Reply via email to