gRPC leverages HTTP/2's built-in flow control mechanisms to provide automatic flow control of streams. However if you are interested in manually doing it for fine-grained control, the server should disable automatic flow control by calling ServerCallStreamObserver.disableAutoRequest and call ServerCallStreamObserver.request(n) to request n messages every time the observer state changes to ready. If you use n=1 you may not need buffering at the application if there is only one response per request. You can follow the Manual flow control server example for Java here <https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java> . If you need to queue messages it looks like your *send* method should just enqueue the message and then call drain queue, to avoid the sequencing problem you mentioned. Your *send* method called by application code and your *drainQueue* method called by grpc should be synchronized in accessing the queue as well.
On Mon, Nov 3, 2025 at 11:00 AM Magno Yu <[email protected]> wrote: > Hi, > > I'm implementing a uni directional server side streaming in Java > with performance/low latency in mind, using Netty. I don't want to > implement by own flow control per se but I'm more interested to write > thread safe code given the existing framework. And I got a few questions.. > > 1) Is it important to look at isReady() before calling onNext(..)? > > If the answer to 1 is no, then we can stop here. > If the answer to 1 is yes, then, let's consider code A) > > Code A) > > ConcurrentLinkedQueue q; > > void start() { > obs.setOnReadyHandler(this::drainQueue); > } > > void send(Msg msg){ > if (obs.isReady()){ > obs.onNext(msg); > } else { > q.offer(msg); > } > } > > // run by grpc threads > void drainQueue(){ > while(obs.isReady() && !q.isEmpty()){ > obs.onNext(msg); > } > } > > This pattern isn't correct because: > > 1) Consider the case where send is called and isReady is false and now > before q.offer is executed, the ready is flipped from false to true and > drainQueue is run with no message in it. Then q.offer is executed. Now my > msg is stuck inside this queue. > > 2) Also onNext is called by two threads (my thread and grpc thread) and > it's easy to see that my message can be sent out of ordered. > > What if my thread never calls onNext? > > This won't work neither because what if the obs is always > ready. Then my message will always get stuck in the queue? > > So is there some sample or typical pattern to implement > server side streaming within the existing flow control framework. Do I need > some Atomic operation? > > Thanks, > - Mag > > > > -- > You received this message because you are subscribed to the Google Groups " > grpc.io" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To view this discussion visit > https://groups.google.com/d/msgid/grpc-io/e04c8dc8-03b1-4ef0-bfef-b37a300f8b60n%40googlegroups.com > <https://groups.google.com/d/msgid/grpc-io/e04c8dc8-03b1-4ef0-bfef-b37a300f8b60n%40googlegroups.com?utm_medium=email&utm_source=footer> > . > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion visit https://groups.google.com/d/msgid/grpc-io/CAEBMeGumcBUYEQf1%2B4NzhHN7qKuzOLyMefGATagnEs71kfBq9w%40mail.gmail.com.
