[
https://issues.apache.org/jira/browse/BEAM-12118?focusedWorklogId=580556&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-580556
]
ASF GitHub Bot logged work on BEAM-12118:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Apr/21 12:58
Start Date: 10/Apr/21 12:58
Worklog Time Spent: 10m
Work Description: scwhittle commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r611042213
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -98,35 +134,60 @@ private boolean allDone() {
*
* <p>All {@link InboundDataClient}s will be failed if processing throws an
exception.
*
- * <p>This method is NOT thread safe. This should only be invoked by a
single thread, and is
- * intended for use with a newly constructed QueueingBeamFnDataClient in
{@link
- * ProcessBundleHandler#processBundle}.
+ * <p>This method is NOT thread safe. This should only be invoked once by a
single thread. See
+ * class comment.
*/
public void drainAndBlock() throws Exception {
+ // There are several ways drainAndBlock completes:
+ // - processing elements fails -> all inbound clients are failed and
exception thrown
+ // - draining starts while inbound clients are active -> the last client
will poision the queue
+ // to notify that no more elements will arrive
+ // - draining starts without any remaining clients -> we just need to
drain the queue and then
+ // are done as no further elements will arrive.
+ boolean requirePoison;
+ synchronized (inboundDataClients) {
+ Preconditions.checkState(!isDraining);
+ isDraining = true;
+ // An alternative would be to add poison here if there were no remaining
clients. However this
+ // could deadlock if the queue was full since this thread is responsible
for consuming it.
+ requirePoison = !inboundDataClients.isEmpty();
+ }
while (true) {
try {
- ConsumerAndData tuple = queue.poll(200, TimeUnit.MILLISECONDS);
- if (tuple != null) {
- // Forward to the consumers who cares about this data.
- tuple.consumer.accept(tuple.data);
+ ConsumerAndData<?> tuple;
+ if (requirePoison) {
+ tuple = queue.take();
+ if (tuple == POISON) {
+ break;
+ }
Review comment:
LinkedBlockingQueue doesn't allow for null elements and take blocks
until there is an element to remove. If it was possible I think that nullness
annotations would catch it too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 580556)
Time Spent: 3h 20m (was: 3h 10m)
> QueuingBeamFnDataClient adds polling latency to completing bundle processing
> ----------------------------------------------------------------------------
>
> Key: BEAM-12118
> URL: https://issues.apache.org/jira/browse/BEAM-12118
> Project: Beam
> Issue Type: Bug
> Components: java-fn-execution
> Reporter: Sam Whittle
> Assignee: Sam Whittle
> Priority: P2
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> Currently the inboundDataClients are registered with recieve, and they add
> data to a queue. There is no explicit indication from the clients that they
> are no longer going to add values to the queue.
> Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled
> and if nothing is present all clients are polled to see if they are complete.
> This design makes for unfortunate tradeoffs on poll timeout:
> - cpu wasted with small timeout
> - additional latency in noticing we have completed with larger timeout
> With the existing InboundDataClient interface, we could have a separate
> thread call awaitCompletion on all of the clients and then shutdown the queue
> (adding a poison pill perhaps)
> Or we could modify InboundDataClient interface to allow registering iterest
> in when the client is done producing elements. The existing clients all seem
> based upon futures which allow that.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)