youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742306537



##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       Ack. I was just thinking that the general Exception catch already calls 
`outboundObserver.onError(e)`, so if you cancel  the queue with the top-level 
exception it would be the same. Or am I misunderstanding what happens with 
`getCause`? I'm reading it as getting the original exception that was nested in 
the `OnErrorException`, but maybe it's a more top level exception than that?
   
   Regardless, this is all nit-picking at this point. Not worth blocking 
approval on. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to