On async streams on servers, you simple call stream->Finish(const Status
<https://cs.corp.google.com/piper///depot/google3/third_party/grpc/google_specific/include/grpc%2B%2B/impl/codegen/status.h?l=24&ct=xref_jump_to_def&gsn=Status&rcl=161340537>
&
<https://cs.corp.google.com/piper///depot/google3/GENERATED/figments/cpp/LValueRefTo/Const/start-with-gr/grpc/class-Status.cc?l=3&ct=xref_jump_to_def&gsn=%26&rcl=161340537>
status
<https://cs.corp.google.com/piper///depot/google3/third_party/grpc/include/grpc%2B%2B/impl/codegen/async_stream.h?l=675&gs=cpp%253Agrpc%253A%253Aclass-ServerAsyncWriterInterface%253C%25231%253E%253A%253AFinish(const%2Bgrpc%253A%253AStatus%2B%2526%252C%2Bvoid%2B*)%253A%253Aparam-status%2540google3%252Fthird_party%252Fgrpc%252Finclude%252Fgrpc%252B%252B%252Fimpl%252Fcodegen%252Fasync_stream.h%253A28320%257Cdecl&gsn=status&ct=xref_usages>,
void*
<https://cs.corp.google.com/piper///depot/google3/GENERATED/figments/cpp/PointerTo/void.cc?l=3&ct=xref_jump_to_def&gsn=*&rcl=161340537>
tag
<https://cs.corp.google.com/piper///depot/google3/third_party/grpc/include/grpc%2B%2B/impl/codegen/async_stream.h?l=675&gs=cpp%253Agrpc%253A%253Aclass-ServerAsyncWriterInterface%253C%25231%253E%253A%253AFinish(const%2Bgrpc%253A%253AStatus%2B%2526%252C%2Bvoid%2B*)%253A%253Aparam-tag%2540google3%252Fthird_party%252Fgrpc%252Finclude%252Fgrpc%252B%252B%252Fimpl%252Fcodegen%252Fasync_stream.h%253A28334%257Cdecl&gsn=tag&ct=xref_usages>)
to send the final status (and do a Cq->next) and close the stream (from
server side). On the client side, the client would do a
cli_stream->Read() Which would fail (since the stream is closed by
server). The client would then do a cli_stream->Finish() to get the status
sent by the server. That is the normal sequence. You really do not need to
do TryCancel(). Just FYI, the following is the typical sequence of events
on a Bidi Stream on client and server side:
Sequence of events on the client side:
1) Create a client bidi stream "cli_stream"
2) Do one or more cli_stream->Read() and cli_stream->Write() (need to match
3) "half-close" i.e close the stream (for writes) from client side by
doing: cli_stream->WritesDone();
4) Note: At this point, cli_stream->Read() will still work (since the
server has not closed the stream from its side)
5) Once the server closes the stream from it's side (cli_stream->Read()
would return false)
6) Do cli_stream->Finish() to get the status from the server
Sequence of events on the server side:
1) Create a server bidi stream "server_stream"
2) Do one more server_stream->Read() and server_stream->Write() (need to
match with step #2 in client side)
3) Once client does a close from its side, server_stream->Read() would
return "false"
4) Note: At this point server_stream->Write() will still work.
5) Now close the writes stream from server side and send a status by
calling server_stream->Finish()
---
Now to specifically answer your question:
>>> If I want to close from the server side by TryCancel() and followed by
a Finish() will this cause any problem?
If for whatever reason you decided to do TryCancel() first on server
followed by server_stream->Finish(), it is a race-condition (The
TryCancel() API is best-effort API and doesn't guarantee the stream will be
cancelled right-away. So it is possible that operation may fail and the
stream wasn't cancelled by the time you called Finish(). In this case,
Finish() would succeed and you get a success event in CQ. If not, i.e if
stream cancel happened by the time you called Finish(), then you get a
failure event in CQ (with status CANCELLED - I think..i am not sure of
exact status code).
>>>After what I called stream->Finish(status), will this event return in
the CQ if the client is closed already?
Well, from the sequence of events I posted above, if the client did a
"half-close" by doing a cli_stream->WritesDone(), then its perfectly safe
for server to do stream->Finish(status).
However, if the client closed the stream by calling "TryCancel", then again
its a race condition. If the cancellation reached server, then
stream->Finish(status) will fail (i.e a failed event with return on CQ)
else, it would succeed if the cancellation did not happen yet.
Hope this helps,
Sree
On Sunday, July 2, 2017 at 2:57:47 PM UTC-7, yihao yang wrote:
>
> Hi, Sree:
>
> Do you know what if the stream is an async bidi stream? If I want to close
> from the server side by TryCancel() and followed by a Finish(), will this
> cause any problem? After what I called stream->Finish(status), will this
> event return in the CQ if the client is closed already?
>
> Thanks,
> Yihao
>
> 在 2017年6月27日星期二 UTC-7下午4:18:43,Sree Kuchibhotla写道:
>>
>> Sorry for the late response.
>>
>> There is no special method to 'close' the BiDi streams. On the server,
>> just returning a status would mean that you are done with the stream.
>>
>> However, in the example you have given, you seem to be calling just 1
>> read. Since you mentioned you are noticing a memory leak, I was
>> wondering...are you sure that you are reading all the messages that client
>> is sending ?
>>
>> You don't have to really check 'context->IsCancelled()' here.
>> stream->Read() has a return value. It returns 'true' as long as there is a
>> message to read from the client. stream->Read() returns a 'false' either
>> when the client gracefully finished the writes (by calling
>> stream->WritesDone() and stream->Finish() on the client side) or if the
>> client cancelled the RPC or there was some other error..
>>
>> So I recommend you rewrite your loop as
>>
>> Status BidiServiceImpl::BiDiStreamingRpc(...)
>> {
>> ..
>> while (stream->Read(&req)) {
>> getRest(resp);
>> stream->Write(resp);
>> }
>>
>> // You can check context->IsCancelled() here if you
>> // are interested in whether the RPC was cancelled
>>
>> return Status(..)
>> }
>>
>> thanks,
>> Sree
>>
>>
>> On Monday, May 1, 2017 at 10:45:37 PM UTC-7, [email protected]
>> wrote:
>>>
>>>
>>> Hello,
>>>
>>> Is there a way to close down BiDi streaming gracefully from C++ server
>>> thread (pthr) when c++ client gets aborted? When I simply return from Bidi
>>> streaming rpc based on IsCancelled() I observe memory leaks.
>>>
>>> Here is the sample code
>>>
>>>
>>> Status BiDiServiceImpl::BiDiStreamingRpc(::grpc::ServerContext* context,
>>> ::grpc::ServerReaderWriter< BiDiResponse, BiDiRequest>* stream)
>>> {
>>> BiDiRequest req;
>>> BiDiResponse resp;
>>>
>>> stream->Read(&req);
>>> while(context->IsCancelled() == false)
>>> {
>>> getResp(resp);
>>> stream->Write(resp);
>>> }
>>> return(Status(Status::GRPC_OK, ""));
>>> }
>>>
>>>
>>> Thanks
>>> Rajeev
>>>
>>>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/dc1a9aeb-5f8c-4291-a755-9b3db1d063ca%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.