I did mean server-streaming, sorry for the confusion. And awesome, that's 
exactly what I was trying to figure out, thanks!

As for the python part, here is a snippet:

    def GrpcService(self, request, context):
        arg = request.arg
        data_stream = some_function(arg)

        # callback to free up resources
        context.add_callback(data_stream.close) 

        for data in data_stream:
            # indefinitely serve data

The add_callback() is from 
http://www.grpc.io/grpc/python/grpc.html?highlight=add_callback#grpc.RpcContext.add_callback

On Monday, February 6, 2017 at 3:26:23 PM UTC-6, Josh Humphries wrote:
>
> On Mon, Feb 6, 2017 at 1:49 PM, Michael Bond <[email protected] 
> <javascript:>> wrote:
>
>> This is a unary to stream, not stream to stream rpc, if that changes 
>> anything.
>>
>
> Do you mean server-streaming (e.g. unary request message, streaming 
> response)? If that is the case, calling stream.CloseSend() has no effect as 
> the stream is already closed. Take a look at the generated code for a 
> server-streaming method, and you'll see that the generated stub calls 
> SendMsg() and CloseSend() for you: 
> https://github.com/grpc/grpc-go/blob/883bfc7bc8feeb7d90501b977e1c23447b9ff136/test/grpc_testing/test.pb.go#L416
>  
>
>> So would calling CloseSend() send a message to the python server? Then 
>> would I just need to handle said message to make sure the callbacks are 
>> executed? 
>>
>
>> Basic flow:
>> Go client opens stream to python server with args
>> Python server dumps back data to go client while the client reads it
>> Go client no longer needs particular data and closes the stream
>>
>
> If the client no longer cares about the response stream and has already 
> finished sending request messages, then cancelling the context is the 
> appropriate action to take.
>  
>
>> Python server executes callback functions to clean up resources <- this 
>> is what currently wasn't happening with CloseSend() until I added a child 
>> context and cancelled it
>>
>
> I still don't quite understand how you've got this wired up. A code sample 
> of your Python server code might help. But, in any event, it sounds like 
> you genuinely need the client to cancel.
>
> Another possibility could be to use bi-di streaming and then have the 
> server polling for request messages and do the clean up when you reach the 
> end of the request stream. But it sounds like that might only add needless 
> complexity to the server endpoint.
>  
>
>>
>> On Monday, February 6, 2017 at 12:24:33 PM UTC-6, Josh Humphries wrote:
>>
>>> On Mon, Feb 6, 2017 at 1:19 PM, Michael Bond <[email protected]> 
>>> wrote:
>>>
>>>> Thanks for the quick reply. Should of specified that the code in the 
>>>> original post is a snippet, there's receiving logic underneath it.
>>>>
>>>> Some more details surrounding this. In this case I have a callback 
>>>> function on the server (written in python) that needs to be executed to 
>>>> free resources, so closing the sending portion does not seems to trigger 
>>>> that. Also the client in this case dictates all connections. The server 
>>>> simply pours a stream of data to the client until the client no longer 
>>>> needs that particular data. So to be more specific with my question how 
>>>> would I fully close the stream from the client's side?
>>>>
>>>
>>> Not sure I follow 100%. But the python code should have similar logic 
>>> where it is receiving the request messages. When the client half-closes the 
>>> stream, the server would get EOF trying to receive (or if python APIs were 
>>> async/push, like the Java APIs are, you'd get an "end of stream" 
>>> notification). Is that where you are doing the clean up?
>>>
>>>  
>>>
>>>>
>>>> On Monday, February 6, 2017 at 9:50:04 AM UTC-6, Josh Humphries wrote:
>>>>>
>>>>> Perhaps more helpful: in your code example, you would then consume the 
>>>>> responses by calling Recv() on the stream until it returns an error 
>>>>> (io.EOF 
>>>>> on successful end of stream or some other error if the call fails). Even 
>>>>> if 
>>>>> you are not expecting any response data from the server, you want to call 
>>>>> Recv() in order to learn the ultimate disposition of the call (did it 
>>>>> result in an error in the server or was it processed successfully?).
>>>>>
>>>>> log.Println("stream starting")
>>>>> streamCtx, cancel := context.WithCancel(ctx)
>>>>> defer cancel()
>>>>> stream, err := grpcStream(streamCtx, otherArgs)
>>>>> if err != nil {
>>>>>     errCh <- err
>>>>>     return
>>>>> }
>>>>> defer stream.CloseSend()
>>>>> defer log.Println("closing stream")
>>>>> for {
>>>>>     msg, err := stream.Recv()
>>>>>     if err == io.EOF {
>>>>>         break
>>>>>     }
>>>>>     if err != nil {
>>>>>         errCh <- err
>>>>>         return
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> ----
>>>>>
>>>>> Josh Humphries
>>>>>
>>>>> FullStory <https://www.fullstory.com/>  |  Atlanta, GA
>>>>>
>>>>> Software Engineer
>>>>>
>>>>> [email protected]
>>>>>
>>>>> On Mon, Feb 6, 2017 at 10:36 AM, Josh Humphries <[email protected]> 
>>>>> wrote:
>>>>>
>>>>>> On the client, the CloseSend method is "half-closing" the stream. So 
>>>>>> it closes the request/upload half of the stream. The stream remains open 
>>>>>> until the server closes the other half: the response/download part of 
>>>>>> the 
>>>>>> stream. Cancelling the stream also closes it (as would the channel being 
>>>>>> disconnected or the call timing out).
>>>>>>
>>>>>>
>>>>>>
>>>>>> ----
>>>>>>
>>>>>> Josh Humphries
>>>>>>
>>>>>> FullStory <https://www.fullstory.com/>  |  Atlanta, GA
>>>>>>
>>>>>> Software Engineer
>>>>>>
>>>>>> [email protected]
>>>>>>
>>>>>> On Mon, Feb 6, 2017 at 10:29 AM, Michael Bond <[email protected]> 
>>>>>> wrote:
>>>>>>
>>>>>>> Hey, trying to make sure I'm doing this correctly.
>>>>>>>
>>>>>>> Right now I'm having issues with closing streams started with a 
>>>>>>> context that is passed around and exists for quite awhile.
>>>>>>>
>>>>>>> In this example "ctx" is passed around to many go routines, I want 
>>>>>>> to keep "ctx" around but passing it to "grpcStream" seems to keep the 
>>>>>>> stream from actually closing. What I did below fixed the issue but I 
>>>>>>> wanted 
>>>>>>> to know if it is needed to pass a child context and cancel it for the 
>>>>>>> stream to actually close. Is CloseSend() not sufficient if the context 
>>>>>>> is 
>>>>>>> still alive?
>>>>>>>
>>>>>>> log.Println("stream starting")
>>>>>>> streamCtx, cancel := context.WithCancel(ctx)
>>>>>>> defer cancel()
>>>>>>> stream, err := grpcStream(streamCtx, otherArgs)
>>>>>>> if err != nil {
>>>>>>>     errCh <- err
>>>>>>>     return
>>>>>>> }
>>>>>>> defer stream.CloseSend()
>>>>>>> defer log.Println("closing stream")
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> -- 
>>>>>>> You received this message because you are subscribed to the Google 
>>>>>>> Groups "grpc.io" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>>> send an email to [email protected].
>>>>>>> To post to this group, send email to [email protected].
>>>>>>> Visit this group at https://groups.google.com/group/grpc-io.
>>>>>>> To view this discussion on the web visit 
>>>>>>> https://groups.google.com/d/msgid/grpc-io/85470940-69e3-4f4c-aed2-31a3242841a3%40googlegroups.com
>>>>>>>  
>>>>>>> <https://groups.google.com/d/msgid/grpc-io/85470940-69e3-4f4c-aed2-31a3242841a3%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>>>>> .
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/7cb055ea-b6ff-4a96-b7ce-4bed2a3a3440%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to