Yes I'm using the direct executor that comes as part of the GrpcServerRule @Rule for testing.
I also tried Spencer's suggestion but the onReady handler only seemed to fire once upon the stream's initialization, so I was not able to get the repeated polling I needed to send messages to the client. On Jan 30, 2018 5:20 PM, "'Kun Zhang' via grpc.io" <[email protected]> wrote: > Are you using direct executor? If so, switching to a normal executor > should avoid the deadlock. > > On Tuesday, January 30, 2018 at 5:30:05 AM UTC-8, [email protected] > wrote: >> >> I'm working on some unit tests for a Bidi Streaming service >> implementation. I'm trying to determine if my design is the issue, or if I >> just need to do something to guard against the deadlock condition. >> >> When the client makes the initial request, the StreamObserver >> implementation for my service starts a new thread which polls the backend >> of my system repeatedly until the stream is closed. Each time it finds new >> messages in the backend, it calls the onNext method of the response >> StreamObserver to send the message back to the client. In my unit tests, >> things work properly most of the time, but sporadically enter a deadlock >> situation. I can't seem to understand why, particularly after adding print >> statements that seem to indicate that I am not encountering situations >> where the two threads are trying to write concurrently to the stream. >> >> The interaction pattern shown below is what I expect. Client makes >> request, server finishes the initial request, then the alternate thread >> sends the client a response. Client gets the response and then acks it, >> which the server then receives. This is all as it should be, but it somehow >> results in the deadlock shown below. >> >> 1517317747082 Client makes initial request >> 1517317747082 Server receives request subscription: >> "subscription-1-to-test-topic-1" >> 1517317747092 Server finishes with client request >> 1517317747105 Server sends response with 6 messages >> 1517317747105 Client receives response with 6 messages >> 1517317747106 Client makes second request >> 1517317747106 Server receives request modify_deadline_seconds: 60 >> modify_deadline_ack_ids: "2-0" >> 1517317747110 Server finishes with client request >> >> >> "subscription-1-to-test-topic-1-0": >> waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, >> a io.grpc.inprocess.InProcessTransport$InProcessStream$InProce >> ssClientStream), >> which is held by "main" >> "main": >> waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, >> a io.grpc.inprocess.InProcessTransport$InProcessStream$InProce >> ssServerStream), >> which is held by "subscription-1-to-test-topic-1-0" >> Java stack information for the threads listed above: >> =================================================== >> "subscription-1-to-test-topic-1-0": >> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce >> ssClientStream.request(InProcessTransport.java:516) >> - waiting to lock <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTra >> nsport$InProcessStream$InProcessClientStream) >> at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383) >> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) >> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37) >> at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.request >> (ClientCalls.java:356) >> at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter >> .onMessage(ClientCalls.java:410) >> at io.grpc.ForwardingClientCallListener.onMessage(ForwardingCli >> entCallListener.java:36) >> at io.grpc.ForwardingClientCallListener.onMessage(ForwardingCli >> entCallListener.java:36) >> at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1Me >> ssagesAvailable.runInContext(ClientCallImpl.java:530) >> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >> at io.grpc.internal.SerializeReentrantCallsDirectExecutor. >> execute(SerializeReentrantCallsDirectExecutor.java:49) >> at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.mes >> sagesAvailable(ClientCallImpl.java:547) >> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce >> ssServerStream.writeMessage(InProcessTransport.java:378) >> - locked <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTra >> nsport$InProcessStream$InProcessServerStream) >> at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134) >> at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext >> (ServerCalls.java:341) >> at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$Stream >> ingPullStreamObserver.streamMessages(SubscriberImpl.java:372) >> at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$Stream >> ingPullStreamObserver$$Lambda$13/179779934.run(Unknown Source) >> at java.util.concurrent.Executors$RunnableAdapter.call( >> Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> "main": >> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce >> ssServerStream.request(InProcessTransport.java:325) >> - waiting to lock <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTra >> nsport$InProcessStream$InProcessServerStream) >> at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78) >> at io.grpc.stub.ServerCalls$StreamingServerCallHandler$Streamin >> gServerCallListener.onMessage(ServerCalls.java:252) >> at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.mes >> sagesAvailable(ServerCallImpl.java:252) >> at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStr >> eamListener$1MessagesAvailable.runInContext(ServerImpl.java:626) >> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >> at io.grpc.internal.SerializeReentrantCallsDirectExecutor. >> execute(SerializeReentrantCallsDirectExecutor.java:49) >> at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStr >> eamListener.messagesAvailable(ServerImpl.java:637) >> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce >> ssClientStream.writeMessage(InProcessTransport.java:564) >> - locked <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTra >> nsport$InProcessStream$InProcessClientStream) >> at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438) >> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) >> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) >> at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext( >> ClientCalls.java:320) >> at com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.st >> reamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( >> FrameworkMethod.java:50) >> at org.junit.internal.runners.model.ReflectiveCallable.run(Refl >> ectiveCallable.java:12) >> at org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr >> ameworkMethod.java:47) >> at org.junit.internal.runners.statements.InvokeMethod.evaluate( >> InvokeMethod.java:17) >> at org.junit.internal.runners.statements.RunBefores.evaluate( >> RunBefores.java:26) >> at org.junit.internal.runners.statements.RunAfters.evaluate(Run >> Afters.java:27) >> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >> >> I've yet to see an example similar to mine where the streaming server >> handler uses a separate thread to generate and send messages back to the >> client, so that's why I'm curious as to whether that is my primary issue or >> not. I appreciate any help and thanks in advance. >> > -- > You received this message because you are subscribed to a topic in the > Google Groups "grpc.io" group. > To unsubscribe from this topic, visit https://groups.google.com/d/ > topic/grpc-io/oS1Eb8UkIjM/unsubscribe. > To unsubscribe from this group and all its topics, send an email to > [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/grpc-io. > To view this discussion on the web visit https://groups.google.com/d/ > msgid/grpc-io/3102f1b5-cd76-4658-a550-1882a423ef04%40googlegroups.com > <https://groups.google.com/d/msgid/grpc-io/3102f1b5-cd76-4658-a550-1882a423ef04%40googlegroups.com?utm_medium=email&utm_source=footer> > . > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CAP%3DnZAuV1iXOntFFz%2BRgiDqtZygKERGGr2%2BG7Y2AMXDhMmoTRg%40mail.gmail.com. For more options, visit https://groups.google.com/d/optout.
