It sounds like you are blocking in your onNext handler, which prevents other callbacks (like onCancel) from firing. Instead of sleeping in your handler, why don't you make a ScheduledExecutorService and submit a runnable to it to run every 1 second. Submitting to that executor returns a future, which you can call cancel on when you get a callback from the observer. That would avoid blocking, avoid sleeping, and free up your threads to handle other requests.
On Wednesday, January 11, 2017 at 8:51:32 PM UTC-8, [email protected] wrote: > > Hello, > > I'm trying to explore client back pressure for a unary server-streaming > grpc call, implemented in Scala using grpc-java. > > In my test, the server rpc call body loops 10 times, calling onNext(...) > and sleeping for 1 second each time. To detect cancellation, I've > registered a listener with the return from Context.current(), registered > another handler using .setOnCancelHandler (after downcasting to > ServerCallStreamObserver), and checked the value of > serverCallStreamObserver.isCancelled. > > The idea is that the server dribbles out 10 messages over (almost) 10 > seconds, but should stop if a client cancels or drops out at any point. In > my tests, I've had the client invoke the cancellation function immediately > after receiving the first message, and have also manually quit the client > program with Ctrl-C.Regardless, the server behaves the same: the loop > executes 10 times (evidence via log messages), and during the call to > streamObserver.onCompleted(), the cancellation listener fires. The > OnCancelHandler seems to fire after the unary call itself returns (scope of > the bound RPC handler exits). The .isCancelled function always returns > false. > > Now this is surprising to me. I thought that the point of these listeners > and handlers (and the .isCancelled method) was so that the server could > save resources by detecting when a client no longer cared about results. It > doesn't seem to be working. > > Do I need to wait for https://github.com/grpc/grpc-java/issues/339 and > implement on top of that, or do I need to do the "verbose and annoying" > thing described in that PR? I would hate to go through the trouble of the > "verbose and annoying" to find that it's broken in exactly the same way the > OnCancelHandler and CancellationListener mechanics seem broken for me now. > > Or is this some unfortunate consequence of the Scala runtime and some > assumptions of immutability? > > I would love to understand this better to know what code I need to write. > Thanks to any kind souls who have time to enlighten me. > -Daniel > > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/745b4587-3fed-438f-af23-b270631ee8ff%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
