A possible caveat is that the StreamObserver is not synchronized, and will require the application to provide thread safety if shared across threads. A workaround is to use the ServerCallStreamObserver's onReadyHandler to push messages: https://grpc.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html#setOnReadyHandler-java.lang.Runnable-
On Tue, Jan 30, 2018 at 5:30 AM, prodonjs via grpc.io < [email protected]> wrote: > I'm working on some unit tests for a Bidi Streaming service > implementation. I'm trying to determine if my design is the issue, or if I > just need to do something to guard against the deadlock condition. > > When the client makes the initial request, the StreamObserver > implementation for my service starts a new thread which polls the backend > of my system repeatedly until the stream is closed. Each time it finds new > messages in the backend, it calls the onNext method of the response > StreamObserver to send the message back to the client. In my unit tests, > things work properly most of the time, but sporadically enter a deadlock > situation. I can't seem to understand why, particularly after adding print > statements that seem to indicate that I am not encountering situations > where the two threads are trying to write concurrently to the stream. > > The interaction pattern shown below is what I expect. Client makes > request, server finishes the initial request, then the alternate thread > sends the client a response. Client gets the response and then acks it, > which the server then receives. This is all as it should be, but it somehow > results in the deadlock shown below. > > 1517317747082 Client makes initial request > 1517317747082 Server receives request subscription: > "subscription-1-to-test-topic-1" > 1517317747092 Server finishes with client request > 1517317747105 Server sends response with 6 messages > 1517317747105 Client receives response with 6 messages > 1517317747106 Client makes second request > 1517317747106 Server receives request modify_deadline_seconds: 60 > modify_deadline_ack_ids: "2-0" > 1517317747110 Server finishes with client request > > > "subscription-1-to-test-topic-1-0": > waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, a > io.grpc.inprocess.InProcessTransport$InProcessStream$ > InProcessClientStream), > which is held by "main" > "main": > waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, a > io.grpc.inprocess.InProcessTransport$InProcessStream$ > InProcessServerStream), > which is held by "subscription-1-to-test-topic-1-0" > Java stack information for the threads listed above: > =================================================== > "subscription-1-to-test-topic-1-0": > at io.grpc.inprocess.InProcessTransport$InProcessStream$ > InProcessClientStream.request(InProcessTransport.java:516) > - waiting to lock <0x00000005cf7e5ee0> (a io.grpc.inprocess. > InProcessTransport$InProcessStream$InProcessClientStream) > at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383) > at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) > at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) > at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter. > request(ClientCalls.java:356) > at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAd > apter.onMessage(ClientCalls.java:410) > at io.grpc.ForwardingClientCallListener.onMessage( > ForwardingClientCallListener.java:36) > at io.grpc.ForwardingClientCallListener.onMessage( > ForwardingClientCallListener.java:36) > at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$ > 1MessagesAvailable.runInContext(ClientCallImpl.java:530) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute( > SerializeReentrantCallsDirectExecutor.java:49) > at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl. > messagesAvailable(ClientCallImpl.java:547) > at io.grpc.inprocess.InProcessTransport$InProcessStream$ > InProcessServerStream.writeMessage(InProcessTransport.java:378) > - locked <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTransport$ > InProcessStream$InProcessServerStream) > at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134) > at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl. > onNext(ServerCalls.java:341) > at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$ > StreamingPullStreamObserver.streamMessages(SubscriberImpl.java:372) > at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$ > StreamingPullStreamObserver$$Lambda$13/179779934.run(Unknown Source) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "main": > at io.grpc.inprocess.InProcessTransport$InProcessStream$ > InProcessServerStream.request(InProcessTransport.java:325) > - waiting to lock <0x00000005cf7e61e0> (a io.grpc.inprocess. > InProcessTransport$InProcessStream$InProcessServerStream) > at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78) > at io.grpc.stub.ServerCalls$StreamingServerCallHandler$ > StreamingServerCallListener.onMessage(ServerCalls.java:252) > at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl. > messagesAvailable(ServerCallImpl.java:252) > at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerS > treamListener$1MessagesAvailable.runInContext(ServerImpl.java:626) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute( > SerializeReentrantCallsDirectExecutor.java:49) > at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerS > treamListener.messagesAvailable(ServerImpl.java:637) > at io.grpc.inprocess.InProcessTransport$InProcessStream$ > InProcessClientStream.writeMessage(InProcessTransport.java:564) > - locked <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTransport$ > InProcessStream$InProcessClientStream) > at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438) > at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) > at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) > at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter. > onNext(ClientCalls.java:320) > at com.google.cloud.partners.pubsub.kafka.SubscriberImplTest. > streamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( > FrameworkMethod.java:50) > at org.junit.internal.runners.model.ReflectiveCallable.run( > ReflectiveCallable.java:12) > at org.junit.runners.model.FrameworkMethod.invokeExplosively( > FrameworkMethod.java:47) > at org.junit.internal.runners.statements.InvokeMethod. > evaluate(InvokeMethod.java:17) > at org.junit.internal.runners.statements.RunBefores. > evaluate(RunBefores.java:26) > at org.junit.internal.runners.statements.RunAfters.evaluate( > RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > > I've yet to see an example similar to mine where the streaming server > handler uses a separate thread to generate and send messages back to the > client, so that's why I'm curious as to whether that is my primary issue or > not. I appreciate any help and thanks in advance. > > -- > You received this message because you are subscribed to the Google Groups " > grpc.io" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/grpc-io. > To view this discussion on the web visit https://groups.google.com/d/ > msgid/grpc-io/d1df2ba7-fab1-4aa4-a8bd-95e40ded3217%40googlegroups.com > <https://groups.google.com/d/msgid/grpc-io/d1df2ba7-fab1-4aa4-a8bd-95e40ded3217%40googlegroups.com?utm_medium=email&utm_source=footer> > . > For more options, visit https://groups.google.com/d/optout. > -- Spencer Fang -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CAK%3D-x_4qGt-QyjFO4vh5L00%2B6PFrwfJQcAjaji3RrwMxZ%3DH9_Q%40mail.gmail.com. For more options, visit https://groups.google.com/d/optout.
