Hi, I develop a desktop application that allows transmission of files 
between computers on a local network.

A recent new feature I added was to add compression during the transfer. 
This seemed like an ideal candidate for using interceptors to handle 
compression/decompression which would handle the stream transparently (and 
allow simple passthru when both clients didn't support or want to use 
compression).

The problem I've run into is when the client (receiver) wants to cancel the 
transfer. Prior to adding the intercept layer, I simply called 
reponse.cancel() which would raise a cancel exception on the server.

With the addition of my client-side interceptor (to decompress the stream 
just before the 'real' client receives it), I'm returning a simple 
generator instead, which is not a Future like the original response. What 
I'm doing at the moment is returning both the new generator as well as the 
original response (future) as a 'cancellable' object. This is ugly because 
now the stub method 'StartTransfer' no longer returns a single object like 
the proto definition declares:
...
rpc StartTransfer(OpInfo) returns (stream FileChunk) {}
...
but I'm actually calling this:
...
op.file_iter_cancellable, op.file_iterator = self.stub.StartTransfer( ...
...

Here is the client interceptor to show my current workaround:

class ChunkDecompressor(grpc.UnaryStreamClientInterceptor):
    def __init__(self):
        pass

    # Intercept the RPC response after it returns from the server but 
before it reaches
    # the remote.
    def intercept_unary_stream(self, continuation, client_call_details, 
request):
        # Only intercept transfer ops.
        if client_call_details.method != "/Warp/StartTransfer":
            return continuation(client_call_details, request)

        try:
            use_comp = request.use_compression
        except AttributeError:
            use_comp = False

        logging.debug("Transfer using compression: %d" % use_comp)

        # When always need to return the original response along with
        # whatever the remote will be iterating over. If there's no
        # compression, we just return the same response twice.
        if not use_comp:
            response = continuation(client_call_details, request)
            return response, response

        def decomp_stream(response):
            # Inbound response (returned from continuation())
            for chunk in response:
                try:
                    if not chunk.chunk:
                        yield chunk
                    else:
                        dcchunk = zlib.decompress(chunk.chunk)
                        chunk.chunk = dcchunk
                        yield chunk
                except Exception as e:
                    logging.warning("Decompression error: %s" % e)
                    # this will go to remote.start_transfer_op()'s handler.
                    raise

        response = continuation(client_call_details, request)

        # With compression, decomp_stream returns a simple generator
        # function. The response is still needed for its future.cancel()
        # method.

        return response, decomp_stream(response)

Is there some other way I can accomplish this? I'm fine keeping it this way 
- this is not any sort of public api. I just have a low-level discomfort 
about my workaround and that usually means I'm doing it wrong :)

Thanks!

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/2f13eeea-0d67-40ae-b805-e659d8022bf8n%40googlegroups.com.

Reply via email to