I realized the I can create a generator class to wrap the origin response 
and expose a cancel() method as well. I noticed the documentation didn't 
seem to be all that strict about intercept return types.

Thanks for the help! :)
On Sunday, January 2, 2022 at 11:23:17 AM UTC-5 [email protected] wrote:

> Hi, I develop a desktop application that allows transmission of files 
> between computers on a local network.
>
> A recent new feature I added was to add compression during the transfer. 
> This seemed like an ideal candidate for using interceptors to handle 
> compression/decompression which would handle the stream transparently (and 
> allow simple passthru when both clients didn't support or want to use 
> compression).
>
> The problem I've run into is when the client (receiver) wants to cancel 
> the transfer. Prior to adding the intercept layer, I simply called 
> reponse.cancel() which would raise a cancel exception on the server.
>
> With the addition of my client-side interceptor (to decompress the stream 
> just before the 'real' client receives it), I'm returning a simple 
> generator instead, which is not a Future like the original response. What 
> I'm doing at the moment is returning both the new generator as well as the 
> original response (future) as a 'cancellable' object. This is ugly because 
> now the stub method 'StartTransfer' no longer returns a single object like 
> the proto definition declares:
> ...
> rpc StartTransfer(OpInfo) returns (stream FileChunk) {}
> ...
> but I'm actually calling this:
> ...
> op.file_iter_cancellable, op.file_iterator = self.stub.StartTransfer( ...
> ...
>
> Here is the client interceptor to show my current workaround:
>
> class ChunkDecompressor(grpc.UnaryStreamClientInterceptor):
>     def __init__(self):
>         pass
>
>     # Intercept the RPC response after it returns from the server but 
> before it reaches
>     # the remote.
>     def intercept_unary_stream(self, continuation, client_call_details, 
> request):
>         # Only intercept transfer ops.
>         if client_call_details.method != "/Warp/StartTransfer":
>             return continuation(client_call_details, request)
>
>         try:
>             use_comp = request.use_compression
>         except AttributeError:
>             use_comp = False
>
>         logging.debug("Transfer using compression: %d" % use_comp)
>
>         # When always need to return the original response along with
>         # whatever the remote will be iterating over. If there's no
>         # compression, we just return the same response twice.
>         if not use_comp:
>             response = continuation(client_call_details, request)
>             return response, response
>
>         def decomp_stream(response):
>             # Inbound response (returned from continuation())
>             for chunk in response:
>                 try:
>                     if not chunk.chunk:
>                         yield chunk
>                     else:
>                         dcchunk = zlib.decompress(chunk.chunk)
>                         chunk.chunk = dcchunk
>                         yield chunk
>                 except Exception as e:
>                     logging.warning("Decompression error: %s" % e)
>                     # this will go to remote.start_transfer_op()'s handler.
>                     raise
>
>         response = continuation(client_call_details, request)
>
>         # With compression, decomp_stream returns a simple generator
>         # function. The response is still needed for its future.cancel()
>         # method.
>
>         return response, decomp_stream(response)
>
> Is there some other way I can accomplish this? I'm fine keeping it this 
> way - this is not any sort of public api. I just have a low-level 
> discomfort about my workaround and that usually means I'm doing it wrong :)
>
> Thanks!
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/4c3cd712-89e6-4ce4-ae50-aa0cdc1a36c4n%40googlegroups.com.

Reply via email to