[
https://issues.apache.org/jira/browse/BEAM-3811?focusedWorklogId=676038&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-676038
]
ASF GitHub Bot logged work on BEAM-3811:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Nov/21 01:53
Start Date: 04/Nov/21 01:53
Worklog Time Spent: 10m
Work Description: youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r741625279
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
if (value != POISON_PILL) {
outboundObserver.onNext(value);
} else {
+ outboundObserver.onCompleted();
return;
}
}
phaser.awaitAdvance(currentPhase);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
+ } catch (OnErrorException e) {
+ outboundObserver.onError(e.getCause());
Review comment:
As far as I can tell, this spot in the code is the only reason to have
`OnErrorException`, and the only difference seems to be to avoid a redundant
`queue.cancel` (it gets cancelled in `onError` and then again here). Is calling
`queue.cancel` twice enough of a problem to warrant this?
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
* becomes ready.
*/
@ThreadSafe
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object>
implements StreamObserver<T> {
Review comment:
Just to make sure I'm understanding this class correctly, it is
essentially working on two separate threads, right? One thread is the calls to
the public API (i.e. the methods of `StreamObserver`, so `onNext`,
`onCompleted`, and `onError`), while the other thread is the executor calling
`drainQueue` which is the thread that actually does all the interaction with
the underlying `outboundObserver`.
Am I understanding this class correctly?
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
if (value != POISON_PILL) {
outboundObserver.onNext(value);
} else {
+ outboundObserver.onCompleted();
return;
}
}
phaser.awaitAdvance(currentPhase);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
+ } catch (OnErrorException e) {
+ outboundObserver.onError(e.getCause());
Review comment:
Ack. I was just thinking that the general Exception catch already calls
`outboundObserver.onError(e)`, so if you cancel the queue with the top-level
exception it would be the same. Or am I misunderstanding what happens with
`getCause`? I'm reading it as getting the original exception that was nested in
the `OnErrorException`, but maybe it's a more top level exception than that?
Regardless, this is all nit-picking at this point. Not worth blocking
approval on.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 676038)
Time Spent: 3h 40m (was: 3.5h)
> Add a CancellableQueue to make close methods safer
> --------------------------------------------------
>
> Key: BEAM-3811
> URL: https://issues.apache.org/jira/browse/BEAM-3811
> Project: Beam
> Issue Type: Bug
> Components: runner-core
> Reporter: Thomas Groh
> Priority: P3
> Labels: portability
> Time Spent: 3h 40m
> Remaining Estimate: 0h
>
> There are multiple locations in which, on a call to {{close}} within some
> portability service, we wish to close all of our outstanding clients.
> However, the call to {{close}} can be interleaved with calls to the method
> which creates a new client. Without having an explicit thread-safe signal
> from the collection of pending clients, there's no way to ensure that all of
> the clients for a service are closed when that service is closed.
>
> As a result, some clients are only terminated by the forceful termination of
> the server, killing the connection.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)