gRPC leverages HTTP/2's built-in flow control mechanisms to provide
automatic flow control of streams. However if you are interested in
manually doing it for fine-grained control, the server should disable
automatic flow control by calling
ServerCallStreamObserver.disableAutoRequest and call
ServerCallStreamObserver.request(n) to request n messages every time the
observer state changes to ready. If you use n=1 you may not need buffering
at the application if there is only one response per request. You can
follow the Manual flow control server example for Java here
<https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java>
.
If you need to queue messages it looks like your *send* method should just
enqueue the message and then call drain queue, to avoid the sequencing
problem you mentioned. Your *send* method called by application code and
your *drainQueue* method called by grpc should be synchronized in accessing
the queue as well.

On Mon, Nov 3, 2025 at 11:00 AM Magno Yu <[email protected]> wrote:

> Hi,
>
>        I'm implementing a uni directional server side streaming in Java
> with performance/low latency in mind, using Netty. I don't want to
> implement by own flow control per se but I'm more interested to write
> thread safe code given the existing framework. And I got a few questions..
>
> 1) Is it important to look at isReady() before calling onNext(..)?
>
>       If the answer to 1 is no, then we can stop here.
>       If the answer to 1 is yes, then, let's consider code A)
>
> Code A)
>
> ConcurrentLinkedQueue q;
>
> void start() {
>        obs.setOnReadyHandler(this::drainQueue);
> }
>
> void send(Msg msg){
>          if (obs.isReady()){
>                   obs.onNext(msg);
>          } else {
>                   q.offer(msg);
>          }
> }
>
> // run by grpc threads
> void drainQueue(){
>          while(obs.isReady() && !q.isEmpty()){
>               obs.onNext(msg);
>          }
> }
>
>            This pattern isn't correct because:
>
> 1) Consider the case where send is called and isReady is false and now
> before q.offer is executed, the ready is flipped from false to true and
> drainQueue is run with no message in it. Then q.offer is executed. Now my
> msg is stuck inside this queue.
>
> 2) Also onNext is called by two threads (my thread and grpc thread) and
> it's easy to see that my message can be sent out of ordered.
>
>             What if my thread never calls onNext?
>
>             This won't work neither because what if the obs is always
> ready. Then my message will always get stuck in the queue?
>
>               So is there some sample or typical pattern to implement
> server side streaming within the existing flow control framework. Do I need
> some Atomic operation?
>
> Thanks,
> - Mag
>
>
>
> --
> You received this message because you are subscribed to the Google Groups "
> grpc.io" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion visit
> https://groups.google.com/d/msgid/grpc-io/e04c8dc8-03b1-4ef0-bfef-b37a300f8b60n%40googlegroups.com
> <https://groups.google.com/d/msgid/grpc-io/e04c8dc8-03b1-4ef0-bfef-b37a300f8b60n%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion visit 
https://groups.google.com/d/msgid/grpc-io/CAEBMeGumcBUYEQf1%2B4NzhHN7qKuzOLyMefGATagnEs71kfBq9w%40mail.gmail.com.

Reply via email to