Thanks for the suggestion. To provide more context, I have full control over both the client and the server, and hence the MAX_CONCURRENT_STREAMS setting. One of the factors in setting MAX_CONCURRENT_STREAMS is to ensure no one client overwhelms the service (to ensure quality of service). I feel buffering is useful when the downstream service is under duress and is not processing the calls fast enough (because of spikes in traffic etc.), then instead of cancelling new streams on the client, buffer them until the downstream service recovers. In my case, the downstream services can number in several thousands, so maintaining a buffer and count at the application level would be redundant as StreamBufferingEncoder already does this. I was able to use AOP to tap into StreamBufferingEncoder and use StreamBufferingEncoder.numBufferedStreams() (along with a reasonable MAX_CONCURRENT_STREAMS value) to help solve this.
On Wednesday, June 21, 2017 at 10:59:33 AM UTC-7, Eric Anderson wrote: > > On Tue, Jun 20, 2017 at 6:16 PM, R A <[email protected] <javascript:>> > wrote: > >> To clarify, I use unary calls. I have a client that connects to >> multiple backend servers. If any of these servers slow down, the >> requests will start buffering in StreamBufferingEncoder indefinitely >> (once beyond MAX_CONCURRENT_STREAMS). I want to be able to identify >> such a scenario and mark the downstream server unavailable (temporarily) >> similar to a circuit breaker logic, if greater than X number of streams are >> buffered for that particular server. That way a slowness in one server >> doesn’t affect requests being sent to other servers in the client. >> > > I don't think depending on the buffering is best here; if a server doubled > its MAX_CONCURRENT_STREAMS or used MAX_INT, then you would still want to > abort RPCs after a point. Instead, it sounds like you should just limit the > number of concurrent calls per Channel to something that you find > "reasonable". That can be done via an interceptor. > > AtomicInteger count = ...; > public blah interceptCall(blah) { > return new LimitingClientCall(next.newCall(blah)); > } > > class LimitingClientCall extends ForwardingClientCall { > public blah start(listener) { > num = count.incrementAndGet(); > if (num <= LIMIT) { > // good case; easy case > super.start(new SimpleForwardingListener(listener) { > onClose() { > count.decrementAndGet(); > super.onClose(); > } > }); > } > // bad case > count.decrementAndGet(); > // cancel not strictly necessary since call wasn't started, but a > good idea > super.cancel("Exceeded limit", null); > // Throw away all future method calls; assumes delegate() returns > 'delegate' > delegate = new NoopClientCall(); > // Doing this last since it can do anything, which may include throwing > listener.onClose(Status.CANCELLED.withDescription("Exceeded limit")); > } > } > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/7b6cb79b-7c46-4f21-a645-70dfd9791c03%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
