sorry for the formatting, let me try again:
I have the following server-streaming method:

> public void multiEcho(EchoRequest verbalVomit, 
StreamObserver<EchoResposne> responseObserver) {
>     log.fine("someone has just emitted an inconsiderated verbal vomit");
>     var callMonitor = new Object();
>     var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
responseObserver;
>     echoObserver.setOnReadyHandler(() -> {
>         log.finer("sink ready");
>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>     });
>     echoObserver.setOnCancelHandler(() -> {
>         log.fine("client cancelled the call 1");
>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>     });
>
>     try {
>         for (int i = 1; i <= verbalVomit.getReps(); i++) {
>             if (echoObserver.isCancelled()) {
>                 log.fine("client cancelled the call 2");
>                 return;
>             }
>             synchronized (callMonitor) {
>                 while( ! echoObserver.isReady()) {
>                     log.finer("sink clogged at rep " + i);
>                     callMonitor.wait();
>                 }
>             }
>
>             // multiply the content to fill the buffer faster
>             var echoBuilder = new StringBuilder();
>             for (int j = 0; j < MULTIPLY_FACTOR; j++) {
>                
 echoBuilder.append(verbalVomit.getInconsideratedVerbalVomit());
>             }
>             var echoedVomit =
>                
 EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();
>
>             if (log.isLoggable(Level.FINEST)) log.finest("echo");
>             echoObserver.onNext(echoedVomit);
>         }
>         echoObserver.onCompleted();
>     } catch (StatusRuntimeException e) {
>         if (e.getStatus().getCode() == Code.CANCELLED) {
>             log.fine("client cancelled the call 3");
>         } else {
>             log.severe("server error: " + e);
>             e.printStackTrace();
>         }
>     } catch (Exception e) {
>         log.severe("server error: " + e);
>         e.printStackTrace();
>         echoObserver.onError(Status.INTERNAL.withCause(e).asException());
>     }
> }

I create the server this way:

> echoServer = NettyServerBuilder
> .forPort(port)
> .maxConnectionAge(10, TimeUnit.MINUTES)
> .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> .addService(new EchoService())
> .build();

and the client looks like this:

> var connector = EchoServiceGrpc.newBlockingStub(channel);
> var request = EchoRequest
> .newBuilder()
> .setInconsideratedVerbalVomit(
> "bleeeeeeeeeeeeeeeeeeeehhhhhhhhhh" +
> "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh")  // 64B
> .setReps(100)
> .build();
> var vomitIterator = connector.multiEcho(request);
> while (vomitIterator.hasNext()) {
> vomitIterator.next();
> System.out.println("got echo");
> }

and it dead-locks after just a few messages  ;-]

server output looks like this:

> started gRPC EchoServer on port 6666
> Jun 05, 2021 8:22:52 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
multiEcho
> FINE: someone has just emitted an inconsiderated verbal vomit
> Jun 05, 2021 8:22:53 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
multiEcho
> FINER: sink clogged at rep 7

...and clients:

> got echo
> got echo
> got echo
> got echo
> got echo
> got echo

...and they both hand indefinitely :(

am I doing something wrong or is it a bug?

The interesting part is that if I use direct executor in the server and 
dispatch the work to a separate executor, then it works without any 
problems (that's what I usually do, so I've never encountered this problem 
before).
ie: 

> public void multiEcho(EchoRequest verbalVomit, 
StreamObserver<EchoResposne> responseObserver) {
>     log.fine("someone has just emitted an inconsiderated verbal vomit");
>     var callMonitor = new Object();
>     var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
responseObserver;
>     echoObserver.setOnReadyHandler(() -> {
>         log.finer("sink ready");
>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>     });
>     echoObserver.setOnCancelHandler(() -> {
>         log.fine("client cancelled the call 1");
>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>     });
>
>     cpuIntensiveOpExecutor.execute(() -> {
>         try {
>             for (int i = 1; i <= verbalVomit.getReps(); i++) {
(...)

and

> echoServer = NettyServerBuilder
> .forPort(port)
> .maxConnectionAge(10, TimeUnit.MINUTES)
> .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> .addService(new EchoService())
> .directExecutor()
> .build();

A full working example (dead-locking that is) can be found on github: 
https://github.com/morgwai/grpc-deadlock

Any hints will be much appreciated :)

Thanks!


On Saturday, June 5, 2021 at 9:49:51 PM UTC+7 Piotr Morgwai Kotarbinski 
wrote:

> Hi all,
> I have the following server-streaming method:
>
> > public void multiEcho(EchoRequest verbalVomit, 
> StreamObserver<EchoResposne> responseObserver) {
> > log.fine("someone has just emitted an inconsiderated verbal vomit");
> > var callMonitor = new Object();
> > var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
> responseObserver;
> > echoObserver.setOnReadyHandler(() -> {
> > log.finer("sink ready");
> > synchronized (callMonitor) { callMonitor.notifyAll(); }
> > });
> > echoObserver.setOnCancelHandler(() -> {
> > log.fine("client cancelled the call 1");
> > synchronized (callMonitor) { callMonitor.notifyAll(); }
> > });
> > 
> > try {
> > for (int i = 1; i <= verbalVomit.getReps(); i++) {
> > if (echoObserver.isCancelled()) {
> > log.fine("client cancelled the call 2");
> > return;
> > }
> > synchronized (callMonitor) {
> > while( ! echoObserver.isReady()) {
> > log.finer("sink clogged at rep " + i);
> > callMonitor.wait();
> > }
> > }
> > 
> > // multiply the content to fill the buffer faster
> > var echoBuilder = new StringBuilder();
> > for (int j = 0; j < MULTIPLY_FACTOR; j++) {
> > echoBuilder.append(verbalVomit.getInconsideratedVerbalVomit());
> > }
> > var echoedVomit =
> > EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();
> > 
> > if (log.isLoggable(Level.FINEST)) log.finest("echo");
> > echoObserver.onNext(echoedVomit);
> > }
> > echoObserver.onCompleted();
> > } catch (StatusRuntimeException e) {
> > if (e.getStatus().getCode() == Code.CANCELLED) {
> > log.fine("client cancelled the call 3");
> > } else {
> > log.severe("server error: " + e);
> > e.printStackTrace();
> > }
> > } catch (Exception e) {
> > log.severe("server error: " + e);
> > e.printStackTrace();
> > echoObserver.onError(Status.INTERNAL.withCause(e).asException());
> > }
> > }
>
> I create the server this way:
>
> > echoServer = NettyServerBuilder
> > .forPort(port)
> > .maxConnectionAge(10, TimeUnit.MINUTES)
> > .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> > .addService(new EchoService())
> > .build();
>
> and the client looks like this:
>
> > var connector = EchoServiceGrpc.newBlockingStub(channel);
> > var request = EchoRequest
> > .newBuilder()
> > .setInconsideratedVerbalVomit(
> > "bleeeeeeeeeeeeeeeeeeeehhhhhhhhhh" +
> > "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh") // 64B
> > .setReps(100)
> > .build();
> > var vomitIterator = connector.multiEcho(request);
> > while (vomitIterator.hasNext()) {
> > vomitIterator.next();
> > System.out.println("got echo");
> > }
>
> and it dead-locks after just a few messages ;-]
>
> server output looks like this:
>
> > started gRPC EchoServer on port 6666
> > Jun 05, 2021 8:22:52 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
> multiEcho
> > FINE: someone has just emitted an inconsiderated verbal vomit
> > Jun 05, 2021 8:22:53 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
> multiEcho
> > FINER: sink clogged at rep 7
>
> ...and clients:
>
> > got echo
> > got echo
> > got echo
> > got echo
> > got echo
> > got echo
>
> ...and they both hand indefinitely :(
>
> am I doing something wrong or is it a bug?
>
> The interesting part is that if I use direct executor in the server and 
> dispatch the work to a separate executor, then it works without any 
> problems (that's what I usually do, so I've never encountered this problem 
> before).
> ie: 
>
> > public void multiEcho(EchoRequest verbalVomit, 
> StreamObserver<EchoResposne> responseObserver) {
> > log.fine("someone has just emitted an inconsiderated verbal vomit");
> > var callMonitor = new Object();
> > var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
> responseObserver;
> > echoObserver.setOnReadyHandler(() -> {
> > log.finer("sink ready");
> > synchronized (callMonitor) { callMonitor.notifyAll(); }
> > });
> > echoObserver.setOnCancelHandler(() -> {
> > log.fine("client cancelled the call 1");
> > synchronized (callMonitor) { callMonitor.notifyAll(); }
> > });
> > 
> > cpuIntensiveOpExecutor.execute(() -> {
> > try {
> > for (int i = 1; i <= verbalVomit.getReps(); i++) {
> (...)
>
> and
>
> > echoServer = NettyServerBuilder
> > .forPort(port)
> > .maxConnectionAge(10, TimeUnit.MINUTES)
> > .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> > .addService(new EchoService())
> > .directExecutor()
> > .build();
>
> A full working example (dead-locking that is) can be found on github: 
> https://github.com/morgwai/grpc-deadlock
>
> Any hints will be much appreciated :)
>
> Thanks!
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/c494debc-92cc-4c44-89a7-6d39db0de4a2n%40googlegroups.com.

Reply via email to