Thanks for the suggestion.  To provide more context, I have full control 
over both the client and the server, and hence the MAX_CONCURRENT_STREAMS 
setting.  One of the factors in setting MAX_CONCURRENT_STREAMS is to ensure 
no one client overwhelms the service (to ensure quality of service).  I 
feel buffering is useful when the downstream service is under duress and is 
not processing the calls fast enough (because of spikes in traffic etc.), 
then instead of cancelling new streams on the client, buffer them until the 
downstream service recovers.  In my case, the downstream services can 
number in several thousands, so maintaining a buffer and count at the 
application level would be redundant as StreamBufferingEncoder already does 
this.  I was able to use AOP to tap into StreamBufferingEncoder and use 
StreamBufferingEncoder.numBufferedStreams() (along with a reasonable 
MAX_CONCURRENT_STREAMS value) to help solve this. 


On Wednesday, June 21, 2017 at 10:59:33 AM UTC-7, Eric Anderson wrote:
>
> On Tue, Jun 20, 2017 at 6:16 PM, R A <[email protected] <javascript:>> 
> wrote:
>
>> To clarify, I use unary calls.  I have a client that connects to 
>> multiple backend servers.  If any of these servers slow down, the 
>> requests will start buffering in StreamBufferingEncoder indefinitely 
>> (once beyond MAX_CONCURRENT_STREAMS).  I want to be able to identify 
>> such a scenario and mark the downstream server unavailable (temporarily) 
>> similar to a circuit breaker logic, if greater than X number of streams are 
>> buffered for that particular server.  That way a slowness in one server 
>> doesn’t affect requests being sent to other servers in the client.
>>
>
> I don't think depending on the buffering is best here; if a server doubled 
> its MAX_CONCURRENT_STREAMS or used MAX_INT, then you would still want to 
> abort RPCs after a point. Instead, it sounds like you should just limit the 
> number of concurrent calls per Channel to something that you find 
> "reasonable". That can be done via an interceptor.
>
> AtomicInteger count = ...;
> public blah interceptCall(blah) {
>   return new LimitingClientCall(next.newCall(blah));
> }
>
> class LimitingClientCall extends ForwardingClientCall {
>   public blah start(listener) {
>     num = count.incrementAndGet();
>     if (num <= LIMIT) {
>       // good case; easy case
>       super.start(new SimpleForwardingListener(listener) {
>         onClose() {
>           count.decrementAndGet();
>           super.onClose();
>         }
>       });
>     }
>     // bad case
>     count.decrementAndGet();
>     // cancel not strictly necessary since call wasn't started, but a 
> good idea
>     super.cancel("Exceeded limit", null);
>      // Throw away all future method calls; assumes delegate() returns 
> 'delegate'
>     delegate = new NoopClientCall();
>     // Doing this last since it can do anything, which may include throwing
>     listener.onClose(Status.CANCELLED.withDescription("Exceeded limit"));
>   }
> }
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/7b6cb79b-7c46-4f21-a645-70dfd9791c03%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to