I'm working on some unit tests for a Bidi Streaming service implementation. I'm trying to determine if my design is the issue, or if I just need to do something to guard against the deadlock condition.
When the client makes the initial request, the StreamObserver implementation for my service starts a new thread which polls the backend of my system repeatedly until the stream is closed. Each time it finds new messages in the backend, it calls the onNext method of the response StreamObserver to send the message back to the client. In my unit tests, things work properly most of the time, but sporadically enter a deadlock situation. I can't seem to understand why, particularly after adding print statements that seem to indicate that I am not encountering situations where the two threads are trying to write concurrently to the stream. The interaction pattern shown below is what I expect. Client makes request, server finishes the initial request, then the alternate thread sends the client a response. Client gets the response and then acks it, which the server then receives. This is all as it should be, but it somehow results in the deadlock shown below. 1517317747082 Client makes initial request 1517317747082 Server receives request subscription: "subscription-1-to-test-topic-1" 1517317747092 Server finishes with client request 1517317747105 Server sends response with 6 messages 1517317747105 Client receives response with 6 messages 1517317747106 Client makes second request 1517317747106 Server receives request modify_deadline_seconds: 60 modify_deadline_ack_ids: "2-0" 1517317747110 Server finishes with client request "subscription-1-to-test-topic-1-0": waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream), which is held by "main" "main": waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream), which is held by "subscription-1-to-test-topic-1-0" Java stack information for the threads listed above: =================================================== "subscription-1-to-test-topic-1-0": at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.request(InProcessTransport.java:516) - waiting to lock <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream) at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383) at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.request(ClientCalls.java:356) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:410) at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36) at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:530) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:547) at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.writeMessage(InProcessTransport.java:378) - locked <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream) at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134) at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341) at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver.streamMessages(SubscriberImpl.java:372) at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver$$Lambda$13/179779934.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) "main": at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.request(InProcessTransport.java:325) - waiting to lock <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream) at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78) at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:252) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:252) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:626) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:637) at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.writeMessage(InProcessTransport.java:564) - locked <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream) at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438) at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:320) at com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.streamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) I've yet to see an example similar to mine where the streaming server handler uses a separate thread to generate and send messages back to the client, so that's why I'm curious as to whether that is my primary issue or not. I appreciate any help and thanks in advance. -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/d1df2ba7-fab1-4aa4-a8bd-95e40ded3217%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
