Your client has closed the connection, which was received by the grpc server layer and closed the ServerCall it maintains. For (presumably) long lived connections, you should establish a cancellation listener on your current Context from within the initial method which cancels any incomplete futures, wrap any asynchronously executed scopes with that context::wrap with Context.current().isCancelled() tests within them to avoid communicating with the responseObserver if cancelled. On another async style note, your future.get() with Interrupted/Execution handling there would be better suited for a Futures.transform on your responseService that invokes responseObserver.onNext with the result.
Cheers! -George On Mon, Jan 28, 2019 at 2:14 AM grpc-noob <[email protected]> wrote: > Hi folks, > > I am pretty noob with gRpc and trying to make bidirectional streaming work > with ListenableFutures. > > My code is as follows: > > RequestService requestService = MoreExecutors.*listeningDecorator* > (MyExecutor) ; > ResponseService responseService = MoreExecutors. > *listeningDecorator*(MyExecutor) ; > > @Override > public StreamObserver<Request> ingestEventsStreaming( > StreamObserver<Response> responseObserver) { > return new StreamObserver<Request>() { > @Override > public void onNext(Request request) { > ListenableFuture<IngestionResponse> listenableFuture = > requestService.submit( new RequestHandlerTask(config)); > > listenableFuture.addListener(new Runnable() { > @Override > public void run() { > try { > responseObserver.onNext(listenableFuture.get()); > } catch (InterruptedException e) { > LOG.error("Interrupted", e); > } catch (ExecutionException e) { > LOG.error("Exception in task", e.getCause()); > } > } }, responseService); > } > > @Override > public void onError(Throwable t) { > LOG.warn(“Request Failed"); > } > > @Override > public void onCompleted() { > responseObserver.onCompleted(); > } > }; > } > > > > Above code throws following exception: > > > Exception in thread "pool-3-thread-1" Exception in thread > "pool-3-thread-3" java.lang.IllegalStateException: call is closed > > at com.google.common.base.Preconditions.checkState(Preconditions.java:510) > > at io.grpc.internal.ServerCallImpl.sendHeaders(ServerCallImpl.java:89) > > at > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:338) > > at > com.paloaltonetworks.apollo2.ingestion.frontend.server.FrontendServer$FrontendService$1$1.run(FrontendServer.java:160) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > It seems that adding listener fails the code. > > > May I know if I am doing anything wrong ? Any help will be much > appreciated. > > > Thanks, > > > > -- > You received this message because you are subscribed to the Google Groups " > grpc.io" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/grpc-io. > To view this discussion on the web visit > https://groups.google.com/d/msgid/grpc-io/1ae15b1d-c7fc-46d8-affa-c13d10fa0af3%40googlegroups.com > <https://groups.google.com/d/msgid/grpc-io/1ae15b1d-c7fc-46d8-affa-c13d10fa0af3%40googlegroups.com?utm_medium=email&utm_source=footer> > . > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CAB5czhdrfvrUPkq%3DUnNHPLdKxi7aokJT%2BqRxwg8Gv4fFhjEFPg%40mail.gmail.com. For more options, visit https://groups.google.com/d/optout.
