A possible caveat is that the StreamObserver is not synchronized, and will
require the application to provide thread safety if shared across threads.
A workaround is to use the ServerCallStreamObserver's onReadyHandler to
push messages:
https://grpc.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html#setOnReadyHandler-java.lang.Runnable-

On Tue, Jan 30, 2018 at 5:30 AM, prodonjs via grpc.io <
[email protected]> wrote:

> I'm working on some unit tests for a Bidi Streaming service
> implementation. I'm trying to determine if my design is the issue, or if I
> just need to do something to guard against the deadlock condition.
>
> When the client makes the initial request, the StreamObserver
> implementation for my service starts a new thread which polls the backend
> of my system repeatedly until the stream is closed. Each time it finds new
> messages in the backend, it calls the onNext method of the response
> StreamObserver to send the message back to the client. In my unit tests,
> things work properly most of the time, but sporadically enter a deadlock
> situation. I can't seem to understand why, particularly after adding print
> statements that seem to indicate that I am not encountering situations
> where the two threads are trying to write concurrently to the stream.
>
> The interaction pattern shown below is what I expect. Client makes
> request, server finishes the initial request, then the alternate thread
> sends the client a response. Client gets the response and then acks it,
> which the server then receives. This is all as it should be, but it somehow
> results in the deadlock shown below.
>
> 1517317747082 Client makes initial request
> 1517317747082 Server receives request subscription:
> "subscription-1-to-test-topic-1"
> 1517317747092 Server finishes with client request
> 1517317747105 Server sends response with 6 messages
> 1517317747105 Client receives response with 6 messages
> 1517317747106 Client makes second request
> 1517317747106 Server receives request modify_deadline_seconds: 60
> modify_deadline_ack_ids: "2-0"
> 1517317747110 Server finishes with client request
>
>
> "subscription-1-to-test-topic-1-0":
>   waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, a
> io.grpc.inprocess.InProcessTransport$InProcessStream$
> InProcessClientStream),
>   which is held by "main"
> "main":
>   waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, a
> io.grpc.inprocess.InProcessTransport$InProcessStream$
> InProcessServerStream),
>   which is held by "subscription-1-to-test-topic-1-0"
> Java stack information for the threads listed above:
> ===================================================
> "subscription-1-to-test-topic-1-0":
> at io.grpc.inprocess.InProcessTransport$InProcessStream$
> InProcessClientStream.request(InProcessTransport.java:516)
> - waiting to lock <0x00000005cf7e5ee0> (a io.grpc.inprocess.
> InProcessTransport$InProcessStream$InProcessClientStream)
> at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383)
> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
> at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
> at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.
> request(ClientCalls.java:356)
> at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAd
> apter.onMessage(ClientCalls.java:410)
> at io.grpc.ForwardingClientCallListener.onMessage(
> ForwardingClientCallListener.java:36)
> at io.grpc.ForwardingClientCallListener.onMessage(
> ForwardingClientCallListener.java:36)
> at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$
> 1MessagesAvailable.runInContext(ClientCallImpl.java:530)
> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(
> SerializeReentrantCallsDirectExecutor.java:49)
> at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.
> messagesAvailable(ClientCallImpl.java:547)
> at io.grpc.inprocess.InProcessTransport$InProcessStream$
> InProcessServerStream.writeMessage(InProcessTransport.java:378)
> - locked <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTransport$
> InProcessStream$InProcessServerStream)
> at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134)
> at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.
> onNext(ServerCalls.java:341)
> at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$
> StreamingPullStreamObserver.streamMessages(SubscriberImpl.java:372)
> at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$
> StreamingPullStreamObserver$$Lambda$13/179779934.run(Unknown Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "main":
> at io.grpc.inprocess.InProcessTransport$InProcessStream$
> InProcessServerStream.request(InProcessTransport.java:325)
> - waiting to lock <0x00000005cf7e61e0> (a io.grpc.inprocess.
> InProcessTransport$InProcessStream$InProcessServerStream)
> at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78)
> at io.grpc.stub.ServerCalls$StreamingServerCallHandler$
> StreamingServerCallListener.onMessage(ServerCalls.java:252)
> at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.
> messagesAvailable(ServerCallImpl.java:252)
> at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerS
> treamListener$1MessagesAvailable.runInContext(ServerImpl.java:626)
> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(
> SerializeReentrantCallsDirectExecutor.java:49)
> at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerS
> treamListener.messagesAvailable(ServerImpl.java:637)
> at io.grpc.inprocess.InProcessTransport$InProcessStream$
> InProcessClientStream.writeMessage(InProcessTransport.java:564)
> - locked <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTransport$
> InProcessStream$InProcessClientStream)
> at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438)
> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
> at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
> at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.
> onNext(ClientCalls.java:320)
> at com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.
> streamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:50)
> at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:47)
> at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:17)
> at org.junit.internal.runners.statements.RunBefores.
> evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(
> RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>
> I've yet to see an example similar to mine where the streaming server
> handler uses a separate thread to generate and send messages back to the
> client, so that's why I'm curious as to whether that is my primary issue or
> not. I appreciate any help and thanks in advance.
>
> --
> You received this message because you are subscribed to the Google Groups "
> grpc.io" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/grpc-io.
> To view this discussion on the web visit https://groups.google.com/d/
> msgid/grpc-io/d1df2ba7-fab1-4aa4-a8bd-95e40ded3217%40googlegroups.com
> <https://groups.google.com/d/msgid/grpc-io/d1df2ba7-fab1-4aa4-a8bd-95e40ded3217%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Spencer Fang

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/CAK%3D-x_4qGt-QyjFO4vh5L00%2B6PFrwfJQcAjaji3RrwMxZ%3DH9_Q%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to