This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.30.0 by this push:
new efa150f [BEAM-12118] Fix race introduced in QueeuingBeamFnDataClient
triggering failed precondition. (#14668)
new ae3525b Merge pull request #14728 from ihji/cherry-pick-12118
efa150f is described below
commit efa150f3b971b513b82b50830a417f2b9adaa151
Author: scwhittle <[email protected]>
AuthorDate: Thu Apr 29 09:30:08 2021 -0700
[BEAM-12118] Fix race introduced in QueeuingBeamFnDataClient triggering
failed precondition. (#14668)
---
.../beam/fn/harness/data/QueueingBeamFnDataClient.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
index 21d0b14..11dea66 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
@@ -83,12 +83,22 @@ public class QueueingBeamFnDataClient implements
BeamFnDataClient {
// empty in which case it returns null.
@Nullable
ConsumerAndData<?> take() throws InterruptedException {
+ // We first poll without blocking to optimize for the case there is data.
+ // If there is no data we end up blocking on take() and thus the extra
+ // poll doesn't matter.
@Nullable ConsumerAndData<?> result = queue.poll();
if (result == null) {
if (closed.get()) {
- return null;
+ // Poll again to ensure that there is nothing in the queue. Once we
observe closed as true
+ // we are guaranteed no additional elements other than the POISON
will be added. However
+ // we can't rely on the previous poll result as it could race with
additional offers and
+ // close.
+ result = queue.poll();
+ } else {
+ // We are not closed so we perform a blocking take. We are
guaranteed that additional
+ // elements will be offered or the POISON will be added by close to
unblock this thread.
+ result = queue.take();
}
- result = queue.take();
}
if (result == POISON) {
return null;