I'm working on some unit tests for a Bidi Streaming service implementation. 
I'm trying to determine if my design is the issue, or if I just need to do 
something to guard against the deadlock condition.

When the client makes the initial request, the StreamObserver 
implementation for my service starts a new thread which polls the backend 
of my system repeatedly until the stream is closed. Each time it finds new 
messages in the backend, it calls the onNext method of the response 
StreamObserver to send the message back to the client. In my unit tests, 
things work properly most of the time, but sporadically enter a deadlock 
situation. I can't seem to understand why, particularly after adding print 
statements that seem to indicate that I am not encountering situations 
where the two threads are trying to write concurrently to the stream.

The interaction pattern shown below is what I expect. Client makes request, 
server finishes the initial request, then the alternate thread sends the 
client a response. Client gets the response and then acks it, which the 
server then receives. This is all as it should be, but it somehow results 
in the deadlock shown below.

1517317747082 Client makes initial request
1517317747082 Server receives request subscription: 
"subscription-1-to-test-topic-1"
1517317747092 Server finishes with client request
1517317747105 Server sends response with 6 messages
1517317747105 Client receives response with 6 messages
1517317747106 Client makes second request
1517317747106 Server receives request modify_deadline_seconds: 60
modify_deadline_ack_ids: "2-0"
1517317747110 Server finishes with client request


"subscription-1-to-test-topic-1-0":
  waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, a 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream),
  which is held by "main"
"main":
  waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, a 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream),
  which is held by "subscription-1-to-test-topic-1-0"
Java stack information for the threads listed above:
===================================================
"subscription-1-to-test-topic-1-0":
at 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.request(InProcessTransport.java:516)
- waiting to lock <0x00000005cf7e5ee0> (a 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream)
at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383)
at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
at 
io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.request(ClientCalls.java:356)
at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:410)
at 
io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
at 
io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:530)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:547)
at 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.writeMessage(InProcessTransport.java:378)
- locked <0x00000005cf7e61e0> (a 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream)
at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134)
at 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
at 
com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver.streamMessages(SubscriberImpl.java:372)
at 
com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver$$Lambda$13/179779934.run(Unknown
 
Source)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"main":
at 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.request(InProcessTransport.java:325)
- waiting to lock <0x00000005cf7e61e0> (a 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream)
at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78)
at 
io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:252)
at 
io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:252)
at 
io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:626)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
at 
io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:637)
at 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.writeMessage(InProcessTransport.java:564)
- locked <0x00000005cf7e5ee0> (a 
io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
at 
io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:320)
at 
com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.streamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)

I've yet to see an example similar to mine where the streaming server 
handler uses a separate thread to generate and send messages back to the 
client, so that's why I'm curious as to whether that is my primary issue or 
not. I appreciate any help and thanks in advance.

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/d1df2ba7-fab1-4aa4-a8bd-95e40ded3217%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to