I was trying this with grpc-1.38.0 on openjdk-11 on ubuntu-18.04 in case it 
matters.


On 05/06/2021 21:49, Piotr Morgwai Kotarbinski wrote:
> Hi all,
> I have the following server-streaming method:
> 
>> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> 
>> responseObserver) {
>>     log.fine("someone has just emitted an inconsiderated verbal vomit");
>>     var callMonitor = new Object();
>>     var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
>> responseObserver;
>>     echoObserver.setOnReadyHandler(() -> {
>>         log.finer("sink ready");
>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>     });
>>     echoObserver.setOnCancelHandler(() -> {
>>         log.fine("client cancelled the call 1");
>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>     });
>>
>>     try {
>>         for (int i = 1; i <= verbalVomit.getReps(); i++) {
>>             if (echoObserver.isCancelled()) {
>>                 log.fine("client cancelled the call 2");
>>                 return;
>>             }
>>             synchronized (callMonitor) {
>>                 while( ! echoObserver.isReady()) {
>>                     log.finer("sink clogged at rep " + i);
>>                     callMonitor.wait();
>>                 }
>>             }
>>
>>             // multiply the content to fill the buffer faster
>>             var echoBuilder = new StringBuilder();
>>             for (int j = 0; j < MULTIPLY_FACTOR; j++) {
>>                 
>> echoBuilder.append(verbalVomit.getInconsideratedVerbalVomit());
>>             }
>>             var echoedVomit =
>>                 
>> EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();
>>
>>             if (log.isLoggable(Level.FINEST)) log.finest("echo");
>>             echoObserver.onNext(echoedVomit);
>>         }
>>         echoObserver.onCompleted();
>>     } catch (StatusRuntimeException e) {
>>         if (e.getStatus().getCode() == Code.CANCELLED) {
>>             log.fine("client cancelled the call 3");
>>         } else {
>>             log.severe("server error: " + e);
>>             e.printStackTrace();
>>         }
>>     } catch (Exception e) {
>>         log.severe("server error: " + e);
>>         e.printStackTrace();
>>         echoObserver.onError(Status.INTERNAL.withCause(e).asException());
>>     }
>> }
> 
> I create the server this way:
> 
>> echoServer = NettyServerBuilder
>>      .forPort(port)
>>      .maxConnectionAge(10, TimeUnit.MINUTES)
>>      .maxConnectionAgeGrace(12, TimeUnit.HOURS)
>>      .addService(new EchoService())
>>      .build();
> 
> and the client looks like this:
> 
>> var connector = EchoServiceGrpc.newBlockingStub(channel);
>> var request = EchoRequest
>>      .newBuilder()
>>      .setInconsideratedVerbalVomit(
>>              "bleeeeeeeeeeeeeeeeeeeehhhhhhhhhh" +
>>              "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh")  // 64B
>>      .setReps(100)
>>      .build();
>> var vomitIterator = connector.multiEcho(request);
>> while (vomitIterator.hasNext()) {
>>      vomitIterator.next();
>>      System.out.println("got echo");
>> }
> 
> and it dead-locks after just a few messages  ;-]
> 
> server output looks like this:
> 
>> started gRPC EchoServer on port 6666
>> Jun 05, 2021 8:22:52 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
>> multiEcho
>> FINE: someone has just emitted an inconsiderated verbal vomit
>> Jun 05, 2021 8:22:53 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
>> multiEcho
>> FINER: sink clogged at rep 7
> 
> ...and clients:
> 
>> got echo
>> got echo
>> got echo
>> got echo
>> got echo
>> got echo
> 
> ...and they both hand indefinitely :(
> 
> am I doing something wrong or is it a bug?
> 
> The interesting part is that if I use direct executor in the server and 
> dispatch the work to a separate executor, then it works without any problems 
> (that's what I usually do, so I've never encountered this problem before).
> ie: 
> 
>> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> 
>> responseObserver) {
>>     log.fine("someone has just emitted an inconsiderated verbal vomit");
>>     var callMonitor = new Object();
>>     var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
>> responseObserver;
>>     echoObserver.setOnReadyHandler(() -> {
>>         log.finer("sink ready");
>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>     });
>>     echoObserver.setOnCancelHandler(() -> {
>>         log.fine("client cancelled the call 1");
>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>     });
>>
>>     cpuIntensiveOpExecutor.execute(() -> {
>>         try {
>>             for (int i = 1; i <= verbalVomit.getReps(); i++) {
> (...)
> 
> and
> 
>> echoServer = NettyServerBuilder
>>      .forPort(port)
>>      .maxConnectionAge(10, TimeUnit.MINUTES)
>>      .maxConnectionAgeGrace(12, TimeUnit.HOURS)
>>      .addService(new EchoService())
>>      .directExecutor()
>>      .build();
> 
> A full working example (dead-locking that is) can be found on github: 
> https://github.com/morgwai/grpc-deadlock
> 
> Any hints will be much appreciated :)
> 
> Thanks!
> 

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/9bf1b6d0-b62f-5617-ccef-0a058dc6341b%40gmail.com.

Reply via email to