[ 
https://issues.apache.org/jira/browse/BEAM-3811?focusedWorklogId=675766&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-675766
 ]

ASF GitHub Bot logged work on BEAM-3811:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Nov/21 01:26
            Start Date: 04/Nov/21 01:26
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742089863



##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       The main reason is to pass through the exception from the caller so that 
I know that this is the case where we want to pull out the cause and not pass 
through the top level exception.
   
   Avoiding another cancel doesn't really do much as you already mentioned.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> 
implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are typically not threadsafe across multiple 
threads so either you need to have threads synchronize or use a queue and have 
a dedicated thread writing.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> 
implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are not threadsafe across multiple threads so 
either you need to have threads synchronize or use a queue and have a dedicated 
thread writing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 675766)
    Time Spent: 3h 20m  (was: 3h 10m)

> Add a CancellableQueue to make close methods safer
> --------------------------------------------------
>
>                 Key: BEAM-3811
>                 URL: https://issues.apache.org/jira/browse/BEAM-3811
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Thomas Groh
>            Priority: P3
>              Labels: portability
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> There are multiple locations in which, on a call to {{close}} within some 
> portability service, we wish to close all of our outstanding clients. 
> However, the call to {{close}} can be interleaved with calls to the method 
> which creates a new client. Without having an explicit thread-safe signal 
> from the collection of pending clients, there's no way to ensure that all of 
> the clients for a service are closed when that service is closed. 
>  
> As a result, some clients are only terminated by the forceful termination of 
> the server, killing the connection.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to