amaliujia commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r610067279
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
##########
@@ -71,23 +85,28 @@ public InboundDataClient receive(
InboundDataClient inboundDataClient =
this.mainClient.receive(apiServiceDescriptor, inputLocation,
queueingConsumer);
queueingConsumer.inboundDataClient = inboundDataClient;
- this.inboundDataClients.computeIfAbsent(
- inboundDataClient, (InboundDataClient idcToStore) -> idcToStore);
+ synchronized (inboundDataClients) {
+ Preconditions.checkState(!isDraining);
+ if (this.inboundDataClients.add(inboundDataClient)) {
+ inboundDataClient.runWhenComplete(() ->
completeInbound(inboundDataClient));
+ }
+ }
return inboundDataClient;
}
- // Returns true if all the InboundDataClients have finished or cancelled and
no values
- // remain on the queue.
- private boolean allDone() {
- for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) {
- if (!inboundDataClient.isDone()) {
- return false;
+ private void completeInbound(InboundDataClient client) {
+ Preconditions.checkState(client.isDone());
+ boolean addPoison = false;
+ synchronized (inboundDataClients) {
+ Preconditions.checkState(inboundDataClients.remove(client));
+ finishedClients.add(client);
+ if (inboundDataClients.isEmpty() && isDraining) {
+ addPoison = true;
Review comment:
I found a hard to understand why adding this boolean flag? To indicate
this is draining?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]