I did mean server-streaming, sorry for the confusion. And awesome, that's
exactly what I was trying to figure out, thanks!
As for the python part, here is a snippet:
def GrpcService(self, request, context):
arg = request.arg
data_stream = some_function(arg)
# callback to free up resources
context.add_callback(data_stream.close)
for data in data_stream:
# indefinitely serve data
The add_callback() is from
http://www.grpc.io/grpc/python/grpc.html?highlight=add_callback#grpc.RpcContext.add_callback
On Monday, February 6, 2017 at 3:26:23 PM UTC-6, Josh Humphries wrote:
>
> On Mon, Feb 6, 2017 at 1:49 PM, Michael Bond <[email protected]
> <javascript:>> wrote:
>
>> This is a unary to stream, not stream to stream rpc, if that changes
>> anything.
>>
>
> Do you mean server-streaming (e.g. unary request message, streaming
> response)? If that is the case, calling stream.CloseSend() has no effect as
> the stream is already closed. Take a look at the generated code for a
> server-streaming method, and you'll see that the generated stub calls
> SendMsg() and CloseSend() for you:
> https://github.com/grpc/grpc-go/blob/883bfc7bc8feeb7d90501b977e1c23447b9ff136/test/grpc_testing/test.pb.go#L416
>
>
>> So would calling CloseSend() send a message to the python server? Then
>> would I just need to handle said message to make sure the callbacks are
>> executed?
>>
>
>> Basic flow:
>> Go client opens stream to python server with args
>> Python server dumps back data to go client while the client reads it
>> Go client no longer needs particular data and closes the stream
>>
>
> If the client no longer cares about the response stream and has already
> finished sending request messages, then cancelling the context is the
> appropriate action to take.
>
>
>> Python server executes callback functions to clean up resources <- this
>> is what currently wasn't happening with CloseSend() until I added a child
>> context and cancelled it
>>
>
> I still don't quite understand how you've got this wired up. A code sample
> of your Python server code might help. But, in any event, it sounds like
> you genuinely need the client to cancel.
>
> Another possibility could be to use bi-di streaming and then have the
> server polling for request messages and do the clean up when you reach the
> end of the request stream. But it sounds like that might only add needless
> complexity to the server endpoint.
>
>
>>
>> On Monday, February 6, 2017 at 12:24:33 PM UTC-6, Josh Humphries wrote:
>>
>>> On Mon, Feb 6, 2017 at 1:19 PM, Michael Bond <[email protected]>
>>> wrote:
>>>
>>>> Thanks for the quick reply. Should of specified that the code in the
>>>> original post is a snippet, there's receiving logic underneath it.
>>>>
>>>> Some more details surrounding this. In this case I have a callback
>>>> function on the server (written in python) that needs to be executed to
>>>> free resources, so closing the sending portion does not seems to trigger
>>>> that. Also the client in this case dictates all connections. The server
>>>> simply pours a stream of data to the client until the client no longer
>>>> needs that particular data. So to be more specific with my question how
>>>> would I fully close the stream from the client's side?
>>>>
>>>
>>> Not sure I follow 100%. But the python code should have similar logic
>>> where it is receiving the request messages. When the client half-closes the
>>> stream, the server would get EOF trying to receive (or if python APIs were
>>> async/push, like the Java APIs are, you'd get an "end of stream"
>>> notification). Is that where you are doing the clean up?
>>>
>>>
>>>
>>>>
>>>> On Monday, February 6, 2017 at 9:50:04 AM UTC-6, Josh Humphries wrote:
>>>>>
>>>>> Perhaps more helpful: in your code example, you would then consume the
>>>>> responses by calling Recv() on the stream until it returns an error
>>>>> (io.EOF
>>>>> on successful end of stream or some other error if the call fails). Even
>>>>> if
>>>>> you are not expecting any response data from the server, you want to call
>>>>> Recv() in order to learn the ultimate disposition of the call (did it
>>>>> result in an error in the server or was it processed successfully?).
>>>>>
>>>>> log.Println("stream starting")
>>>>> streamCtx, cancel := context.WithCancel(ctx)
>>>>> defer cancel()
>>>>> stream, err := grpcStream(streamCtx, otherArgs)
>>>>> if err != nil {
>>>>> errCh <- err
>>>>> return
>>>>> }
>>>>> defer stream.CloseSend()
>>>>> defer log.Println("closing stream")
>>>>> for {
>>>>> msg, err := stream.Recv()
>>>>> if err == io.EOF {
>>>>> break
>>>>> }
>>>>> if err != nil {
>>>>> errCh <- err
>>>>> return
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> ----
>>>>>
>>>>> Josh Humphries
>>>>>
>>>>> FullStory <https://www.fullstory.com/> | Atlanta, GA
>>>>>
>>>>> Software Engineer
>>>>>
>>>>> [email protected]
>>>>>
>>>>> On Mon, Feb 6, 2017 at 10:36 AM, Josh Humphries <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> On the client, the CloseSend method is "half-closing" the stream. So
>>>>>> it closes the request/upload half of the stream. The stream remains open
>>>>>> until the server closes the other half: the response/download part of
>>>>>> the
>>>>>> stream. Cancelling the stream also closes it (as would the channel being
>>>>>> disconnected or the call timing out).
>>>>>>
>>>>>>
>>>>>>
>>>>>> ----
>>>>>>
>>>>>> Josh Humphries
>>>>>>
>>>>>> FullStory <https://www.fullstory.com/> | Atlanta, GA
>>>>>>
>>>>>> Software Engineer
>>>>>>
>>>>>> [email protected]
>>>>>>
>>>>>> On Mon, Feb 6, 2017 at 10:29 AM, Michael Bond <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey, trying to make sure I'm doing this correctly.
>>>>>>>
>>>>>>> Right now I'm having issues with closing streams started with a
>>>>>>> context that is passed around and exists for quite awhile.
>>>>>>>
>>>>>>> In this example "ctx" is passed around to many go routines, I want
>>>>>>> to keep "ctx" around but passing it to "grpcStream" seems to keep the
>>>>>>> stream from actually closing. What I did below fixed the issue but I
>>>>>>> wanted
>>>>>>> to know if it is needed to pass a child context and cancel it for the
>>>>>>> stream to actually close. Is CloseSend() not sufficient if the context
>>>>>>> is
>>>>>>> still alive?
>>>>>>>
>>>>>>> log.Println("stream starting")
>>>>>>> streamCtx, cancel := context.WithCancel(ctx)
>>>>>>> defer cancel()
>>>>>>> stream, err := grpcStream(streamCtx, otherArgs)
>>>>>>> if err != nil {
>>>>>>> errCh <- err
>>>>>>> return
>>>>>>> }
>>>>>>> defer stream.CloseSend()
>>>>>>> defer log.Println("closing stream")
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> --
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "grpc.io" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>> send an email to [email protected].
>>>>>>> To post to this group, send email to [email protected].
>>>>>>> Visit this group at https://groups.google.com/group/grpc-io.
>>>>>>> To view this discussion on the web visit
>>>>>>> https://groups.google.com/d/msgid/grpc-io/85470940-69e3-4f4c-aed2-31a3242841a3%40googlegroups.com
>>>>>>>
>>>>>>> <https://groups.google.com/d/msgid/grpc-io/85470940-69e3-4f4c-aed2-31a3242841a3%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>>>>> .
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/7cb055ea-b6ff-4a96-b7ce-4bed2a3a3440%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.