Hi, I develop a desktop application that allows transmission of files
between computers on a local network.
A recent new feature I added was to add compression during the transfer.
This seemed like an ideal candidate for using interceptors to handle
compression/decompression which would handle the stream transparently (and
allow simple passthru when both clients didn't support or want to use
compression).
The problem I've run into is when the client (receiver) wants to cancel the
transfer. Prior to adding the intercept layer, I simply called
reponse.cancel() which would raise a cancel exception on the server.
With the addition of my client-side interceptor (to decompress the stream
just before the 'real' client receives it), I'm returning a simple
generator instead, which is not a Future like the original response. What
I'm doing at the moment is returning both the new generator as well as the
original response (future) as a 'cancellable' object. This is ugly because
now the stub method 'StartTransfer' no longer returns a single object like
the proto definition declares:
...
rpc StartTransfer(OpInfo) returns (stream FileChunk) {}
...
but I'm actually calling this:
...
op.file_iter_cancellable, op.file_iterator = self.stub.StartTransfer( ...
...
Here is the client interceptor to show my current workaround:
class ChunkDecompressor(grpc.UnaryStreamClientInterceptor):
def __init__(self):
pass
# Intercept the RPC response after it returns from the server but
before it reaches
# the remote.
def intercept_unary_stream(self, continuation, client_call_details,
request):
# Only intercept transfer ops.
if client_call_details.method != "/Warp/StartTransfer":
return continuation(client_call_details, request)
try:
use_comp = request.use_compression
except AttributeError:
use_comp = False
logging.debug("Transfer using compression: %d" % use_comp)
# When always need to return the original response along with
# whatever the remote will be iterating over. If there's no
# compression, we just return the same response twice.
if not use_comp:
response = continuation(client_call_details, request)
return response, response
def decomp_stream(response):
# Inbound response (returned from continuation())
for chunk in response:
try:
if not chunk.chunk:
yield chunk
else:
dcchunk = zlib.decompress(chunk.chunk)
chunk.chunk = dcchunk
yield chunk
except Exception as e:
logging.warning("Decompression error: %s" % e)
# this will go to remote.start_transfer_op()'s handler.
raise
response = continuation(client_call_details, request)
# With compression, decomp_stream returns a simple generator
# function. The response is still needed for its future.cancel()
# method.
return response, decomp_stream(response)
Is there some other way I can accomplish this? I'm fine keeping it this way
- this is not any sort of public api. I just have a low-level discomfort
about my workaround and that usually means I'm doing it wrong :)
Thanks!
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/2f13eeea-0d67-40ae-b805-e659d8022bf8n%40googlegroups.com.