For completeness, I ended up with this class to replace the original 
response:

class StreamReponseWrapper():
    def __init__(self, response):
        self.response = response

    def __iter__(self):
        return self

    def __next__(self):
        chunk = self.response.__next__()

        try:
            if not chunk.chunk:
                return chunk
            else:
                dcchunk = zlib.decompress(chunk.chunk)
                chunk.chunk = dcchunk
                return chunk
        except Exception as e:
            logging.warning("Decompression error: %s" % e)
            # this will go to remote.start_transfer_op()'s handler.
            raise

    # Future
    def cancel(self):
        self.response.cancel()

On Sunday, January 2, 2022 at 8:38:35 PM UTC-5 [email protected] wrote:

> I realized the I can create a generator class to wrap the origin response 
> and expose a cancel() method as well. I noticed the documentation didn't 
> seem to be all that strict about intercept return types.
>
> Thanks for the help! :)
> On Sunday, January 2, 2022 at 11:23:17 AM UTC-5 [email protected] wrote:
>
>> Hi, I develop a desktop application that allows transmission of files 
>> between computers on a local network.
>>
>> A recent new feature I added was to add compression during the transfer. 
>> This seemed like an ideal candidate for using interceptors to handle 
>> compression/decompression which would handle the stream transparently (and 
>> allow simple passthru when both clients didn't support or want to use 
>> compression).
>>
>> The problem I've run into is when the client (receiver) wants to cancel 
>> the transfer. Prior to adding the intercept layer, I simply called 
>> reponse.cancel() which would raise a cancel exception on the server.
>>
>> With the addition of my client-side interceptor (to decompress the stream 
>> just before the 'real' client receives it), I'm returning a simple 
>> generator instead, which is not a Future like the original response. What 
>> I'm doing at the moment is returning both the new generator as well as the 
>> original response (future) as a 'cancellable' object. This is ugly because 
>> now the stub method 'StartTransfer' no longer returns a single object like 
>> the proto definition declares:
>> ...
>> rpc StartTransfer(OpInfo) returns (stream FileChunk) {}
>> ...
>> but I'm actually calling this:
>> ...
>> op.file_iter_cancellable, op.file_iterator = self.stub.StartTransfer( ...
>> ...
>>
>> Here is the client interceptor to show my current workaround:
>>
>> class ChunkDecompressor(grpc.UnaryStreamClientInterceptor):
>>     def __init__(self):
>>         pass
>>
>>     # Intercept the RPC response after it returns from the server but 
>> before it reaches
>>     # the remote.
>>     def intercept_unary_stream(self, continuation, client_call_details, 
>> request):
>>         # Only intercept transfer ops.
>>         if client_call_details.method != "/Warp/StartTransfer":
>>             return continuation(client_call_details, request)
>>
>>         try:
>>             use_comp = request.use_compression
>>         except AttributeError:
>>             use_comp = False
>>
>>         logging.debug("Transfer using compression: %d" % use_comp)
>>
>>         # When always need to return the original response along with
>>         # whatever the remote will be iterating over. If there's no
>>         # compression, we just return the same response twice.
>>         if not use_comp:
>>             response = continuation(client_call_details, request)
>>             return response, response
>>
>>         def decomp_stream(response):
>>             # Inbound response (returned from continuation())
>>             for chunk in response:
>>                 try:
>>                     if not chunk.chunk:
>>                         yield chunk
>>                     else:
>>                         dcchunk = zlib.decompress(chunk.chunk)
>>                         chunk.chunk = dcchunk
>>                         yield chunk
>>                 except Exception as e:
>>                     logging.warning("Decompression error: %s" % e)
>>                     # this will go to remote.start_transfer_op()'s 
>> handler.
>>                     raise
>>
>>         response = continuation(client_call_details, request)
>>
>>         # With compression, decomp_stream returns a simple generator
>>         # function. The response is still needed for its future.cancel()
>>         # method.
>>
>>         return response, decomp_stream(response)
>>
>> Is there some other way I can accomplish this? I'm fine keeping it this 
>> way - this is not any sort of public api. I just have a low-level 
>> discomfort about my workaround and that usually means I'm doing it wrong :)
>>
>> Thanks!
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/87085e68-ab0a-43bf-9438-67d3a2e1cfc1n%40googlegroups.com.

Reply via email to