youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r741625279
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
if (value != POISON_PILL) {
outboundObserver.onNext(value);
} else {
+ outboundObserver.onCompleted();
return;
}
}
phaser.awaitAdvance(currentPhase);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
+ } catch (OnErrorException e) {
+ outboundObserver.onError(e.getCause());
Review comment:
As far as I can tell, this spot in the code is the only reason to have
`OnErrorException`, and the only difference seems to be to avoid a redundant
`queue.cancel` (it gets cancelled in `onError` and then again here). Is calling
`queue.cancel` twice enough of a problem to warrant this?
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
* becomes ready.
*/
@ThreadSafe
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object>
implements StreamObserver<T> {
Review comment:
Just to make sure I'm understanding this class correctly, it is
essentially working on two separate threads, right? One thread is the calls to
the public API (i.e. the methods of `StreamObserver`, so `onNext`,
`onCompleted`, and `onError`), while the other thread is the executor calling
`drainQueue` which is the thread that actually does all the interaction with
the underlying `outboundObserver`.
Am I understanding this class correctly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]