Are you using direct executor? If so, switching to a normal executor should avoid the deadlock.
On Tuesday, January 30, 2018 at 5:30:05 AM UTC-8, [email protected] wrote: > > I'm working on some unit tests for a Bidi Streaming service > implementation. I'm trying to determine if my design is the issue, or if I > just need to do something to guard against the deadlock condition. > > When the client makes the initial request, the StreamObserver > implementation for my service starts a new thread which polls the backend > of my system repeatedly until the stream is closed. Each time it finds new > messages in the backend, it calls the onNext method of the response > StreamObserver to send the message back to the client. In my unit tests, > things work properly most of the time, but sporadically enter a deadlock > situation. I can't seem to understand why, particularly after adding print > statements that seem to indicate that I am not encountering situations > where the two threads are trying to write concurrently to the stream. > > The interaction pattern shown below is what I expect. Client makes > request, server finishes the initial request, then the alternate thread > sends the client a response. Client gets the response and then acks it, > which the server then receives. This is all as it should be, but it somehow > results in the deadlock shown below. > > 1517317747082 Client makes initial request > 1517317747082 Server receives request subscription: > "subscription-1-to-test-topic-1" > 1517317747092 Server finishes with client request > 1517317747105 Server sends response with 6 messages > 1517317747105 Client receives response with 6 messages > 1517317747106 Client makes second request > 1517317747106 Server receives request modify_deadline_seconds: 60 > modify_deadline_ack_ids: "2-0" > 1517317747110 Server finishes with client request > > > "subscription-1-to-test-topic-1-0": > waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, a > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream), > which is held by "main" > "main": > waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, a > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream), > which is held by "subscription-1-to-test-topic-1-0" > Java stack information for the threads listed above: > =================================================== > "subscription-1-to-test-topic-1-0": > at > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.request(InProcessTransport.java:516) > - waiting to lock <0x00000005cf7e5ee0> (a > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream) > at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383) > at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) > at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) > at > io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.request(ClientCalls.java:356) > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:410) > at > io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36) > at > io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:530) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:547) > at > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.writeMessage(InProcessTransport.java:378) > - locked <0x00000005cf7e61e0> (a > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream) > at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134) > at > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341) > at > com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver.streamMessages(SubscriberImpl.java:372) > at > com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver$$Lambda$13/179779934.run(Unknown > > Source) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "main": > at > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.request(InProcessTransport.java:325) > - waiting to lock <0x00000005cf7e61e0> (a > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream) > at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78) > at > io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:252) > at > io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:252) > at > io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:626) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49) > at > io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:637) > at > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.writeMessage(InProcessTransport.java:564) > - locked <0x00000005cf7e5ee0> (a > io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream) > at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438) > at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) > at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) > at > io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:320) > at > com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.streamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > > I've yet to see an example similar to mine where the streaming server > handler uses a separate thread to generate and send messages back to the > client, so that's why I'm curious as to whether that is my primary issue or > not. I appreciate any help and thanks in advance. > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/3102f1b5-cd76-4658-a550-1882a423ef04%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
