I realized the I can create a generator class to wrap the origin response and expose a cancel() method as well. I noticed the documentation didn't seem to be all that strict about intercept return types.
Thanks for the help! :) On Sunday, January 2, 2022 at 11:23:17 AM UTC-5 [email protected] wrote: > Hi, I develop a desktop application that allows transmission of files > between computers on a local network. > > A recent new feature I added was to add compression during the transfer. > This seemed like an ideal candidate for using interceptors to handle > compression/decompression which would handle the stream transparently (and > allow simple passthru when both clients didn't support or want to use > compression). > > The problem I've run into is when the client (receiver) wants to cancel > the transfer. Prior to adding the intercept layer, I simply called > reponse.cancel() which would raise a cancel exception on the server. > > With the addition of my client-side interceptor (to decompress the stream > just before the 'real' client receives it), I'm returning a simple > generator instead, which is not a Future like the original response. What > I'm doing at the moment is returning both the new generator as well as the > original response (future) as a 'cancellable' object. This is ugly because > now the stub method 'StartTransfer' no longer returns a single object like > the proto definition declares: > ... > rpc StartTransfer(OpInfo) returns (stream FileChunk) {} > ... > but I'm actually calling this: > ... > op.file_iter_cancellable, op.file_iterator = self.stub.StartTransfer( ... > ... > > Here is the client interceptor to show my current workaround: > > class ChunkDecompressor(grpc.UnaryStreamClientInterceptor): > def __init__(self): > pass > > # Intercept the RPC response after it returns from the server but > before it reaches > # the remote. > def intercept_unary_stream(self, continuation, client_call_details, > request): > # Only intercept transfer ops. > if client_call_details.method != "/Warp/StartTransfer": > return continuation(client_call_details, request) > > try: > use_comp = request.use_compression > except AttributeError: > use_comp = False > > logging.debug("Transfer using compression: %d" % use_comp) > > # When always need to return the original response along with > # whatever the remote will be iterating over. If there's no > # compression, we just return the same response twice. > if not use_comp: > response = continuation(client_call_details, request) > return response, response > > def decomp_stream(response): > # Inbound response (returned from continuation()) > for chunk in response: > try: > if not chunk.chunk: > yield chunk > else: > dcchunk = zlib.decompress(chunk.chunk) > chunk.chunk = dcchunk > yield chunk > except Exception as e: > logging.warning("Decompression error: %s" % e) > # this will go to remote.start_transfer_op()'s handler. > raise > > response = continuation(client_call_details, request) > > # With compression, decomp_stream returns a simple generator > # function. The response is still needed for its future.cancel() > # method. > > return response, decomp_stream(response) > > Is there some other way I can accomplish this? I'm fine keeping it this > way - this is not any sort of public api. I just have a low-level > discomfort about my workaround and that usually means I'm doing it wrong :) > > Thanks! > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/4c3cd712-89e6-4ce4-ae50-aa0cdc1a36c4n%40googlegroups.com.
