Yes I'm using the direct executor that comes as part of the GrpcServerRule
@Rule for testing.

I also tried Spencer's suggestion but the onReady handler only seemed to
fire once upon the stream's initialization, so I was not able to get the
repeated polling I needed to send messages to the client.

On Jan 30, 2018 5:20 PM, "'Kun Zhang' via grpc.io" <[email protected]>
wrote:

> Are you using direct executor? If so, switching to a normal executor
> should avoid the deadlock.
>
> On Tuesday, January 30, 2018 at 5:30:05 AM UTC-8, [email protected]
> wrote:
>>
>> I'm working on some unit tests for a Bidi Streaming service
>> implementation. I'm trying to determine if my design is the issue, or if I
>> just need to do something to guard against the deadlock condition.
>>
>> When the client makes the initial request, the StreamObserver
>> implementation for my service starts a new thread which polls the backend
>> of my system repeatedly until the stream is closed. Each time it finds new
>> messages in the backend, it calls the onNext method of the response
>> StreamObserver to send the message back to the client. In my unit tests,
>> things work properly most of the time, but sporadically enter a deadlock
>> situation. I can't seem to understand why, particularly after adding print
>> statements that seem to indicate that I am not encountering situations
>> where the two threads are trying to write concurrently to the stream.
>>
>> The interaction pattern shown below is what I expect. Client makes
>> request, server finishes the initial request, then the alternate thread
>> sends the client a response. Client gets the response and then acks it,
>> which the server then receives. This is all as it should be, but it somehow
>> results in the deadlock shown below.
>>
>> 1517317747082 Client makes initial request
>> 1517317747082 Server receives request subscription:
>> "subscription-1-to-test-topic-1"
>> 1517317747092 Server finishes with client request
>> 1517317747105 Server sends response with 6 messages
>> 1517317747105 Client receives response with 6 messages
>> 1517317747106 Client makes second request
>> 1517317747106 Server receives request modify_deadline_seconds: 60
>> modify_deadline_ack_ids: "2-0"
>> 1517317747110 Server finishes with client request
>>
>>
>> "subscription-1-to-test-topic-1-0":
>>   waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0,
>> a io.grpc.inprocess.InProcessTransport$InProcessStream$InProce
>> ssClientStream),
>>   which is held by "main"
>> "main":
>>   waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0,
>> a io.grpc.inprocess.InProcessTransport$InProcessStream$InProce
>> ssServerStream),
>>   which is held by "subscription-1-to-test-topic-1-0"
>> Java stack information for the threads listed above:
>> ===================================================
>> "subscription-1-to-test-topic-1-0":
>> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce
>> ssClientStream.request(InProcessTransport.java:516)
>> - waiting to lock <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTra
>> nsport$InProcessStream$InProcessClientStream)
>> at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383)
>> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
>> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
>> at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.request
>> (ClientCalls.java:356)
>> at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter
>> .onMessage(ClientCalls.java:410)
>> at io.grpc.ForwardingClientCallListener.onMessage(ForwardingCli
>> entCallListener.java:36)
>> at io.grpc.ForwardingClientCallListener.onMessage(ForwardingCli
>> entCallListener.java:36)
>> at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1Me
>> ssagesAvailable.runInContext(ClientCallImpl.java:530)
>> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>> at io.grpc.internal.SerializeReentrantCallsDirectExecutor.
>> execute(SerializeReentrantCallsDirectExecutor.java:49)
>> at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.mes
>> sagesAvailable(ClientCallImpl.java:547)
>> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce
>> ssServerStream.writeMessage(InProcessTransport.java:378)
>> - locked <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTra
>> nsport$InProcessStream$InProcessServerStream)
>> at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134)
>> at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext
>> (ServerCalls.java:341)
>> at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$Stream
>> ingPullStreamObserver.streamMessages(SubscriberImpl.java:372)
>> at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$Stream
>> ingPullStreamObserver$$Lambda$13/179779934.run(Unknown Source)
>> at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> "main":
>> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce
>> ssServerStream.request(InProcessTransport.java:325)
>> - waiting to lock <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTra
>> nsport$InProcessStream$InProcessServerStream)
>> at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78)
>> at io.grpc.stub.ServerCalls$StreamingServerCallHandler$Streamin
>> gServerCallListener.onMessage(ServerCalls.java:252)
>> at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.mes
>> sagesAvailable(ServerCallImpl.java:252)
>> at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStr
>> eamListener$1MessagesAvailable.runInContext(ServerImpl.java:626)
>> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>> at io.grpc.internal.SerializeReentrantCallsDirectExecutor.
>> execute(SerializeReentrantCallsDirectExecutor.java:49)
>> at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStr
>> eamListener.messagesAvailable(ServerImpl.java:637)
>> at io.grpc.inprocess.InProcessTransport$InProcessStream$InProce
>> ssClientStream.writeMessage(InProcessTransport.java:564)
>> - locked <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTra
>> nsport$InProcessStream$InProcessClientStream)
>> at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438)
>> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
>> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
>> at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(
>> ClientCalls.java:320)
>> at com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.st
>> reamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
>> FrameworkMethod.java:50)
>> at org.junit.internal.runners.model.ReflectiveCallable.run(Refl
>> ectiveCallable.java:12)
>> at org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> ameworkMethod.java:47)
>> at org.junit.internal.runners.statements.InvokeMethod.evaluate(
>> InvokeMethod.java:17)
>> at org.junit.internal.runners.statements.RunBefores.evaluate(
>> RunBefores.java:26)
>> at org.junit.internal.runners.statements.RunAfters.evaluate(Run
>> Afters.java:27)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>
>> I've yet to see an example similar to mine where the streaming server
>> handler uses a separate thread to generate and send messages back to the
>> client, so that's why I'm curious as to whether that is my primary issue or
>> not. I appreciate any help and thanks in advance.
>>
> --
> You received this message because you are subscribed to a topic in the
> Google Groups "grpc.io" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/
> topic/grpc-io/oS1Eb8UkIjM/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/grpc-io.
> To view this discussion on the web visit https://groups.google.com/d/
> msgid/grpc-io/3102f1b5-cd76-4658-a550-1882a423ef04%40googlegroups.com
> <https://groups.google.com/d/msgid/grpc-io/3102f1b5-cd76-4658-a550-1882a423ef04%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/CAP%3DnZAuV1iXOntFFz%2BRgiDqtZygKERGGr2%2BG7Y2AMXDhMmoTRg%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to