Are you using direct executor? If so, switching to a normal executor should 
avoid the deadlock.

On Tuesday, January 30, 2018 at 5:30:05 AM UTC-8, [email protected] wrote:
>
> I'm working on some unit tests for a Bidi Streaming service 
> implementation. I'm trying to determine if my design is the issue, or if I 
> just need to do something to guard against the deadlock condition.
>
> When the client makes the initial request, the StreamObserver 
> implementation for my service starts a new thread which polls the backend 
> of my system repeatedly until the stream is closed. Each time it finds new 
> messages in the backend, it calls the onNext method of the response 
> StreamObserver to send the message back to the client. In my unit tests, 
> things work properly most of the time, but sporadically enter a deadlock 
> situation. I can't seem to understand why, particularly after adding print 
> statements that seem to indicate that I am not encountering situations 
> where the two threads are trying to write concurrently to the stream.
>
> The interaction pattern shown below is what I expect. Client makes 
> request, server finishes the initial request, then the alternate thread 
> sends the client a response. Client gets the response and then acks it, 
> which the server then receives. This is all as it should be, but it somehow 
> results in the deadlock shown below.
>
> 1517317747082 Client makes initial request
> 1517317747082 Server receives request subscription: 
> "subscription-1-to-test-topic-1"
> 1517317747092 Server finishes with client request
> 1517317747105 Server sends response with 6 messages
> 1517317747105 Client receives response with 6 messages
> 1517317747106 Client makes second request
> 1517317747106 Server receives request modify_deadline_seconds: 60
> modify_deadline_ack_ids: "2-0"
> 1517317747110 Server finishes with client request
>
>
> "subscription-1-to-test-topic-1-0":
>   waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, a 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream),
>   which is held by "main"
> "main":
>   waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, a 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream),
>   which is held by "subscription-1-to-test-topic-1-0"
> Java stack information for the threads listed above:
> ===================================================
> "subscription-1-to-test-topic-1-0":
> at 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.request(InProcessTransport.java:516)
> - waiting to lock <0x00000005cf7e5ee0> (a 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream)
> at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383)
> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
> at 
> io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.request(ClientCalls.java:356)
> at 
> io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:410)
> at 
> io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
> at 
> io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
> at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:530)
> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at 
> io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
> at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:547)
> at 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.writeMessage(InProcessTransport.java:378)
> - locked <0x00000005cf7e61e0> (a 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream)
> at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134)
> at 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
> at 
> com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver.streamMessages(SubscriberImpl.java:372)
> at 
> com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver$$Lambda$13/179779934.run(Unknown
>  
> Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "main":
> at 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.request(InProcessTransport.java:325)
> - waiting to lock <0x00000005cf7e61e0> (a 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream)
> at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78)
> at 
> io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:252)
> at 
> io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:252)
> at 
> io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:626)
> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at 
> io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
> at 
> io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:637)
> at 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.writeMessage(InProcessTransport.java:564)
> - locked <0x00000005cf7e5ee0> (a 
> io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream)
> at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438)
> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
> at 
> io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:320)
> at 
> com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.streamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>
> I've yet to see an example similar to mine where the streaming server 
> handler uses a separate thread to generate and send messages back to the 
> client, so that's why I'm curious as to whether that is my primary issue or 
> not. I appreciate any help and thanks in advance.
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/3102f1b5-cd76-4658-a550-1882a423ef04%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to