youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r741625279
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
if (value != POISON_PILL) {
outboundObserver.onNext(value);
} else {
+ outboundObserver.onCompleted();
return;
}
}
phaser.awaitAdvance(currentPhase);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
+ } catch (OnErrorException e) {
+ outboundObserver.onError(e.getCause());
Review comment:
As far as I can tell, this spot in the code is the only reason to have
`OnErrorException`, and the only difference seems to be to avoid a redundant
`queue.cancel` (it gets cancelled in `onError` and then again here). Is calling
`queue.cancel` twice enough of a problem to warrant this?
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
* becomes ready.
*/
@ThreadSafe
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object>
implements StreamObserver<T> {
Review comment:
Just to make sure I'm understanding this class correctly, it is
essentially working on two separate threads, right? One thread is the calls to
the public API (i.e. the methods of `StreamObserver`, so `onNext`,
`onCompleted`, and `onError`), while the other thread is the executor calling
`drainQueue` which is the thread that actually does all the interaction with
the underlying `outboundObserver`.
Am I understanding this class correctly?
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
if (value != POISON_PILL) {
outboundObserver.onNext(value);
} else {
+ outboundObserver.onCompleted();
return;
}
}
phaser.awaitAdvance(currentPhase);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
+ } catch (OnErrorException e) {
+ outboundObserver.onError(e.getCause());
Review comment:
Ack. I was just thinking that the general Exception catch already calls
`outboundObserver.onError(e)`, so if you cancel the queue with the top-level
exception it would be the same. Or am I misunderstanding what happens with
`getCause`? I'm reading it as getting the original exception that was nested in
the `OnErrorException`, but maybe it's a more top level exception than that?
Regardless, this is all nit-picking at this point. Not worth blocking
approval on.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]