For completeness, I ended up with this class to replace the original
response:
class StreamReponseWrapper():
def __init__(self, response):
self.response = response
def __iter__(self):
return self
def __next__(self):
chunk = self.response.__next__()
try:
if not chunk.chunk:
return chunk
else:
dcchunk = zlib.decompress(chunk.chunk)
chunk.chunk = dcchunk
return chunk
except Exception as e:
logging.warning("Decompression error: %s" % e)
# this will go to remote.start_transfer_op()'s handler.
raise
# Future
def cancel(self):
self.response.cancel()
On Sunday, January 2, 2022 at 8:38:35 PM UTC-5 [email protected] wrote:
> I realized the I can create a generator class to wrap the origin response
> and expose a cancel() method as well. I noticed the documentation didn't
> seem to be all that strict about intercept return types.
>
> Thanks for the help! :)
> On Sunday, January 2, 2022 at 11:23:17 AM UTC-5 [email protected] wrote:
>
>> Hi, I develop a desktop application that allows transmission of files
>> between computers on a local network.
>>
>> A recent new feature I added was to add compression during the transfer.
>> This seemed like an ideal candidate for using interceptors to handle
>> compression/decompression which would handle the stream transparently (and
>> allow simple passthru when both clients didn't support or want to use
>> compression).
>>
>> The problem I've run into is when the client (receiver) wants to cancel
>> the transfer. Prior to adding the intercept layer, I simply called
>> reponse.cancel() which would raise a cancel exception on the server.
>>
>> With the addition of my client-side interceptor (to decompress the stream
>> just before the 'real' client receives it), I'm returning a simple
>> generator instead, which is not a Future like the original response. What
>> I'm doing at the moment is returning both the new generator as well as the
>> original response (future) as a 'cancellable' object. This is ugly because
>> now the stub method 'StartTransfer' no longer returns a single object like
>> the proto definition declares:
>> ...
>> rpc StartTransfer(OpInfo) returns (stream FileChunk) {}
>> ...
>> but I'm actually calling this:
>> ...
>> op.file_iter_cancellable, op.file_iterator = self.stub.StartTransfer( ...
>> ...
>>
>> Here is the client interceptor to show my current workaround:
>>
>> class ChunkDecompressor(grpc.UnaryStreamClientInterceptor):
>> def __init__(self):
>> pass
>>
>> # Intercept the RPC response after it returns from the server but
>> before it reaches
>> # the remote.
>> def intercept_unary_stream(self, continuation, client_call_details,
>> request):
>> # Only intercept transfer ops.
>> if client_call_details.method != "/Warp/StartTransfer":
>> return continuation(client_call_details, request)
>>
>> try:
>> use_comp = request.use_compression
>> except AttributeError:
>> use_comp = False
>>
>> logging.debug("Transfer using compression: %d" % use_comp)
>>
>> # When always need to return the original response along with
>> # whatever the remote will be iterating over. If there's no
>> # compression, we just return the same response twice.
>> if not use_comp:
>> response = continuation(client_call_details, request)
>> return response, response
>>
>> def decomp_stream(response):
>> # Inbound response (returned from continuation())
>> for chunk in response:
>> try:
>> if not chunk.chunk:
>> yield chunk
>> else:
>> dcchunk = zlib.decompress(chunk.chunk)
>> chunk.chunk = dcchunk
>> yield chunk
>> except Exception as e:
>> logging.warning("Decompression error: %s" % e)
>> # this will go to remote.start_transfer_op()'s
>> handler.
>> raise
>>
>> response = continuation(client_call_details, request)
>>
>> # With compression, decomp_stream returns a simple generator
>> # function. The response is still needed for its future.cancel()
>> # method.
>>
>> return response, decomp_stream(response)
>>
>> Is there some other way I can accomplish this? I'm fine keeping it this
>> way - this is not any sort of public api. I just have a low-level
>> discomfort about my workaround and that usually means I'm doing it wrong :)
>>
>> Thanks!
>>
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/87085e68-ab0a-43bf-9438-67d3a2e1cfc1n%40googlegroups.com.