This is an automated email from the ASF dual-hosted git repository. rskraba pushed a commit to branch branch-1.9 in repository https://gitbox.apache.org/repos/asf/avro.git
commit 4006a7643c28a658bf2585761f64b4095aa51a71 Author: Michael A. Smith <[email protected]> AuthorDate: Sun Dec 1 06:18:58 2019 -0500 AVRO-2595: Remove Unusable Py3 Txipc (#716) The Py3 package will be superseded by python --- lang/py3/avro/tests/txsample_http_client.py | 106 ------------- lang/py3/avro/tests/txsample_http_server.py | 71 --------- lang/py3/avro/txipc.py | 223 ---------------------------- 3 files changed, 400 deletions(-) diff --git a/lang/py3/avro/tests/txsample_http_client.py b/lang/py3/avro/tests/txsample_http_client.py deleted file mode 100644 index a536aa8..0000000 --- a/lang/py3/avro/tests/txsample_http_client.py +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python -*- -# -*- coding: utf-8 -*- - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import sys - -from avro import protocol, txipc -from twisted.internet import defer, reactor -from twisted.python.util import println - -MAIL_PROTOCOL_JSON = """\ -{"namespace": "example.proto", - "protocol": "Mail", - - "types": [ - {"name": "Message", "type": "record", - "fields": [ - {"name": "to", "type": "string"}, - {"name": "from", "type": "string"}, - {"name": "body", "type": "string"} - ] - } - ], - - "messages": { - "send": { - "request": [{"name": "message", "type": "Message"}], - "response": "string" - }, - "replay": { - "request": [], - "response": "string" - } - } -} -""" -MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON) -SERVER_HOST = 'localhost' -SERVER_PORT = 9090 - -class UsageError(Exception): - def __init__(self, value): - self.value = value - def __str__(self): - return repr(self.value) - -def make_requestor(server_host, server_port, protocol): - client = txipc.TwistedHTTPTransceiver(SERVER_HOST, SERVER_PORT) - return txipc.TwistedRequestor(protocol, client) - -if __name__ == '__main__': - if len(sys.argv) not in [4, 5]: - raise UsageError("Usage: <to> <from> <body> [<count>]") - - # client code - attach to the server and send a message - # fill in the Message record - message = dict() - message['to'] = sys.argv[1] - message['from'] = sys.argv[2] - message['body'] = sys.argv[3] - - try: - num_messages = int(sys.argv[4]) - except IndexError: - num_messages = 1 - - # build the parameters for the request - params = {} - params['message'] = message - - requests = [] - # send the requests and print the result - for msg_count in range(num_messages): - requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL) - d = requestor.request('send', params) - d.addCallback(lambda result: println("Result: " + result)) - requests.append(d) - results = defer.gatherResults(requests) - - def replay_cb(result): - print("Replay Result: " + result) - reactor.stop() - - def replay(_): - # try out a replay message - requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL) - d = requestor.request('replay', dict()) - d.addCallback(replay_cb) - - results.addCallback(replay) - reactor.run() diff --git a/lang/py3/avro/tests/txsample_http_server.py b/lang/py3/avro/tests/txsample_http_server.py deleted file mode 100644 index beb1267..0000000 --- a/lang/py3/avro/tests/txsample_http_server.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python -*- -# -*- coding: utf-8 -*- - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software - - -from avro import ipc, protocol, txipc -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -from twisted.internet import reactor -# See the License for the specific language governing permissions and -# limitations under the License. -from twisted.web import server - -MAIL_PROTOCOL_JSON = """\ -{"namespace": "example.proto", - "protocol": "Mail", - - "types": [ - {"name": "Message", "type": "record", - "fields": [ - {"name": "to", "type": "string"}, - {"name": "from", "type": "string"}, - {"name": "body", "type": "string"} - ] - } - ], - - "messages": { - "send": { - "request": [{"name": "message", "type": "Message"}], - "response": "string" - }, - "replay": { - "request": [], - "response": "string" - } - } -} -""" -MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON) -SERVER_ADDRESS = ('localhost', 9090) - -class MailResponder(ipc.Responder): - def __init__(self): - ipc.Responder.__init__(self, MAIL_PROTOCOL) - - def invoke(self, message, request): - if message.name == 'send': - request_content = request['message'] - response = "Sent message to %(to)s from %(from)s with body %(body)s" % \ - request_content - return response - elif message.name == 'replay': - return 'replay' - -if __name__ == '__main__': - root = server.Site(txipc.AvroResponderResource(MailResponder())) - reactor.listenTCP(9090, root) - reactor.run() diff --git a/lang/py3/avro/txipc.py b/lang/py3/avro/txipc.py deleted file mode 100644 index d8e3225..0000000 --- a/lang/py3/avro/txipc.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python -*- -# -*- coding: utf-8 -*- - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import io - -from zope.interface import implements - -from avro import io as avro_io -from avro import ipc -from twisted.internet.defer import Deferred, maybeDeferred -from twisted.internet.protocol import Protocol -from twisted.web import resource, server -from twisted.web.client import Agent -from twisted.web.http_headers import Headers -from twisted.web.iweb import IBodyProducer - - -class TwistedRequestor(ipc.BaseRequestor): - """A Twisted-compatible requestor. Returns a Deferred that will fire with the - returning value, instead of blocking until the request completes.""" - def _process_handshake(self, call_response, message_name, request_datum): - # process the handshake and call response - buffer_decoder = avro_io.BinaryDecoder(io.StringIO(call_response)) - call_response_exists = self.read_handshake_response(buffer_decoder) - if call_response_exists: - return self.read_call_response(message_name, buffer_decoder) - else: - return self.request(message_name, request_datum) - - def issue_request(self, call_request, message_name, request_datum): - d = self.transceiver.transceive(call_request) - d.addCallback(self._process_handshake, message_name, request_datum) - return d - -class RequestStreamingProducer(object): - """A streaming producer for issuing requests with the Twisted.web Agent.""" - implements(IBodyProducer) - - paused = False - stopped = False - started = False - - def __init__(self, message): - self._message = message - self._length = len(message) - # We need a buffer length header for every buffer and an additional - # zero-length buffer as the message terminator - self._length += (self._length / ipc.BUFFER_SIZE + 2) \ - * ipc.BUFFER_HEADER_LENGTH - self._total_bytes_sent = 0 - self._deferred = Deferred() - - # read-only properties - message = property(lambda self: self._message) - length = property(lambda self: self._length) - consumer = property(lambda self: self._consumer) - deferred = property(lambda self: self._deferred) - - def _get_total_bytes_sent(self): - return self._total_bytes_sent - - def _set_total_bytes_sent(self, bytes_sent): - self._total_bytes_sent = bytes_sent - - total_bytes_sent = property(_get_total_bytes_sent, _set_total_bytes_sent) - - def startProducing(self, consumer): - if self.started: - return - - self.started = True - self._consumer = consumer - # Keep writing data to the consumer until we're finished, - # paused (pauseProducing()) or stopped (stopProducing()) - while self.length - self.total_bytes_sent > 0 and \ - not self.paused and not self.stopped: - self.write() - # self.write will fire this deferred once it has written - # the entire message to the consumer - return self.deferred - - def resumeProducing(self): - self.paused = False - self.write(self) - - def pauseProducing(self): - self.paused = True - - def stopProducing(self): - self.stopped = True - - def write(self): - if self.length - self.total_bytes_sent > ipc.BUFFER_SIZE: - buffer_length = ipc.BUFFER_SIZE - else: - buffer_length = self.length - self.total_bytes_sent - self.write_buffer(self.message[self.total_bytes_sent: - (self.total_bytes_sent + buffer_length)]) - self.total_bytes_sent += buffer_length - # Make sure we wrote the entire message - if self.total_bytes_sent == self.length and not self.stopped: - self.stopProducing() - # A message is always terminated by a zero-length buffer. - self.write_buffer_length(0) - self.deferred.callback(None) - - def write_buffer(self, chunk): - buffer_length = len(chunk) - self.write_buffer_length(buffer_length) - self.consumer.write(chunk) - - def write_buffer_length(self, n): - self.consumer.write(ipc.BIG_ENDIAN_INT_STRUCT.pack(n)) - -class AvroProtocol(Protocol): - - recvd = '' - done = False - - def __init__(self, finished): - self.finished = finished - self.message = [] - - def dataReceived(self, data): - self.recvd = self.recvd + data - while len(self.recvd) >= ipc.BUFFER_HEADER_LENGTH: - buffer_length ,= ipc.BIG_ENDIAN_INT_STRUCT.unpack( - self.recvd[:ipc.BUFFER_HEADER_LENGTH]) - if buffer_length == 0: - response = ''.join(self.message) - self.done = True - self.finished.callback(response) - break - if len(self.recvd) < buffer_length + ipc.BUFFER_HEADER_LENGTH: - break - buffer = self.recvd[ipc.BUFFER_HEADER_LENGTH:buffer_length + ipc.BUFFER_HEADER_LENGTH] - self.recvd = self.recvd[buffer_length + ipc.BUFFER_HEADER_LENGTH:] - self.message.append(buffer) - - def connectionLost(self, reason): - if not self.done: - self.finished.errback(ipc.ConnectionClosedException("Reader read 0 bytes.")) - -class TwistedHTTPTransceiver(object): - """This transceiver uses the Agent class present in Twisted.web >= 9.0 - for issuing requests to the remote endpoint.""" - def __init__(self, host, port, remote_name=None, reactor=None): - self.url = "http://%s:%d/" % (host, port) - - if remote_name is None: - # There's no easy way to get this peer's remote address - # in Twisted so I use a random UUID to identify ourselves - import uuid - self.remote_name = uuid.uuid4() - - if reactor is None: - from twisted.internet import reactor - self.agent = Agent(reactor) - - def read_framed_message(self, response): - finished = Deferred() - response.deliverBody(AvroProtocol(finished)) - return finished - - def transceive(self, request): - req_method = 'POST' - req_headers = { - 'Content-Type': ['avro/binary'], - 'Accept-Encoding': ['identity'], - } - - body_producer = RequestStreamingProducer(request) - d = self.agent.request( - req_method, - self.url, - headers=Headers(req_headers), - bodyProducer=body_producer) - return d.addCallback(self.read_framed_message) - -class AvroResponderResource(resource.Resource): - """This Twisted.web resource can be placed anywhere in a URL hierarchy - to provide an Avro endpoint. Different Avro protocols can be served - by the same web server as long as they are in different resources in - a URL hierarchy.""" - isLeaf = True - - def __init__(self, responder): - resource.Resource.__init__(self) - self.responder = responder - - def cb_render_POST(self, resp_body, request): - request.setResponseCode(200) - request.setHeader('Content-Type', 'avro/binary') - resp_writer = ipc.FramedWriter(request) - resp_writer.write_framed_message(resp_body) - request.finish() - - def render_POST(self, request): - # Unfortunately, Twisted.web doesn't support incoming - # streamed input yet, the whole payload must be kept in-memory - request.content.seek(0, 0) - call_request_reader = ipc.FramedReader(request.content) - call_request = call_request_reader.read_framed_message() - d = maybeDeferred(self.responder.respond, call_request) - d.addCallback(self.cb_render_POST, request) - return server.NOT_DONE_YET
