This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 4006a7643c28a658bf2585761f64b4095aa51a71
Author: Michael A. Smith <[email protected]>
AuthorDate: Sun Dec 1 06:18:58 2019 -0500

    AVRO-2595: Remove Unusable Py3 Txipc (#716)
    
    The Py3 package will be superseded by python
---
 lang/py3/avro/tests/txsample_http_client.py | 106 -------------
 lang/py3/avro/tests/txsample_http_server.py |  71 ---------
 lang/py3/avro/txipc.py                      | 223 ----------------------------
 3 files changed, 400 deletions(-)

diff --git a/lang/py3/avro/tests/txsample_http_client.py 
b/lang/py3/avro/tests/txsample_http_client.py
deleted file mode 100644
index a536aa8..0000000
--- a/lang/py3/avro/tests/txsample_http_client.py
+++ /dev/null
@@ -1,106 +0,0 @@
-#!/usr/bin/env python3
-# -*- mode: python -*-
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import sys
-
-from avro import protocol, txipc
-from twisted.internet import defer, reactor
-from twisted.python.util import println
-
-MAIL_PROTOCOL_JSON = """\
-{"namespace": "example.proto",
- "protocol": "Mail",
-
- "types": [
-     {"name": "Message", "type": "record",
-      "fields": [
-          {"name": "to",   "type": "string"},
-          {"name": "from", "type": "string"},
-          {"name": "body", "type": "string"}
-      ]
-     }
- ],
-
- "messages": {
-     "send": {
-         "request": [{"name": "message", "type": "Message"}],
-         "response": "string"
-     },
-     "replay": {
-         "request": [],
-         "response": "string"
-     }
- }
-}
-"""
-MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
-SERVER_HOST = 'localhost'
-SERVER_PORT = 9090
-
-class UsageError(Exception):
-  def __init__(self, value):
-    self.value = value
-  def __str__(self):
-    return repr(self.value)
-
-def make_requestor(server_host, server_port, protocol):
-  client = txipc.TwistedHTTPTransceiver(SERVER_HOST, SERVER_PORT)
-  return txipc.TwistedRequestor(protocol, client)
-
-if __name__ == '__main__':
-  if len(sys.argv) not in [4, 5]:
-    raise UsageError("Usage: <to> <from> <body> [<count>]")
-
-  # client code - attach to the server and send a message
-  # fill in the Message record
-  message = dict()
-  message['to'] = sys.argv[1]
-  message['from'] = sys.argv[2]
-  message['body'] = sys.argv[3]
-
-  try:
-    num_messages = int(sys.argv[4])
-  except IndexError:
-    num_messages = 1
-
-  # build the parameters for the request
-  params = {}
-  params['message'] = message
-
-  requests = []
-  # send the requests and print the result
-  for msg_count in range(num_messages):
-    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
-    d = requestor.request('send', params)
-    d.addCallback(lambda result: println("Result: " + result))
-    requests.append(d)
-  results = defer.gatherResults(requests)
-
-  def replay_cb(result):
-    print("Replay Result: " + result)
-    reactor.stop()
-
-  def replay(_):
-    # try out a replay message
-    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
-    d = requestor.request('replay', dict())
-    d.addCallback(replay_cb)
-
-  results.addCallback(replay)
-  reactor.run()
diff --git a/lang/py3/avro/tests/txsample_http_server.py 
b/lang/py3/avro/tests/txsample_http_server.py
deleted file mode 100644
index beb1267..0000000
--- a/lang/py3/avro/tests/txsample_http_server.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#!/usr/bin/env python3
-# -*- mode: python -*-
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-
-
-from avro import ipc, protocol, txipc
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-from twisted.internet import reactor
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from twisted.web import server
-
-MAIL_PROTOCOL_JSON = """\
-{"namespace": "example.proto",
- "protocol": "Mail",
-
- "types": [
-     {"name": "Message", "type": "record",
-      "fields": [
-          {"name": "to",   "type": "string"},
-          {"name": "from", "type": "string"},
-          {"name": "body", "type": "string"}
-      ]
-     }
- ],
-
- "messages": {
-     "send": {
-         "request": [{"name": "message", "type": "Message"}],
-         "response": "string"
-     },
-     "replay": {
-         "request": [],
-         "response": "string"
-     }
- }
-}
-"""
-MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
-SERVER_ADDRESS = ('localhost', 9090)
-
-class MailResponder(ipc.Responder):
-  def __init__(self):
-    ipc.Responder.__init__(self, MAIL_PROTOCOL)
-
-  def invoke(self, message, request):
-    if message.name == 'send':
-      request_content = request['message']
-      response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
-                 request_content
-      return response
-    elif message.name == 'replay':
-      return 'replay'
-
-if __name__ == '__main__':
-  root = server.Site(txipc.AvroResponderResource(MailResponder()))
-  reactor.listenTCP(9090, root)
-  reactor.run()
diff --git a/lang/py3/avro/txipc.py b/lang/py3/avro/txipc.py
deleted file mode 100644
index d8e3225..0000000
--- a/lang/py3/avro/txipc.py
+++ /dev/null
@@ -1,223 +0,0 @@
-#!/usr/bin/env python3
-# -*- mode: python -*-
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import io
-
-from zope.interface import implements
-
-from avro import io as avro_io
-from avro import ipc
-from twisted.internet.defer import Deferred, maybeDeferred
-from twisted.internet.protocol import Protocol
-from twisted.web import resource, server
-from twisted.web.client import Agent
-from twisted.web.http_headers import Headers
-from twisted.web.iweb import IBodyProducer
-
-
-class TwistedRequestor(ipc.BaseRequestor):
-  """A Twisted-compatible requestor. Returns a Deferred that will fire with the
-     returning value, instead of blocking until the request completes."""
-  def _process_handshake(self, call_response, message_name, request_datum):
-    # process the handshake and call response
-    buffer_decoder = avro_io.BinaryDecoder(io.StringIO(call_response))
-    call_response_exists = self.read_handshake_response(buffer_decoder)
-    if call_response_exists:
-      return self.read_call_response(message_name, buffer_decoder)
-    else:
-      return self.request(message_name, request_datum)
-
-  def issue_request(self, call_request, message_name, request_datum):
-    d = self.transceiver.transceive(call_request)
-    d.addCallback(self._process_handshake, message_name, request_datum)
-    return d
-
-class RequestStreamingProducer(object):
-  """A streaming producer for issuing requests with the Twisted.web Agent."""
-  implements(IBodyProducer)
-
-  paused = False
-  stopped = False
-  started = False
-
-  def __init__(self, message):
-    self._message = message
-    self._length = len(message)
-    # We need a buffer length header for every buffer and an additional
-    # zero-length buffer as the message terminator
-    self._length += (self._length / ipc.BUFFER_SIZE + 2) \
-      * ipc.BUFFER_HEADER_LENGTH
-    self._total_bytes_sent = 0
-    self._deferred = Deferred()
-
-  # read-only properties
-  message = property(lambda self: self._message)
-  length = property(lambda self: self._length)
-  consumer = property(lambda self: self._consumer)
-  deferred = property(lambda self: self._deferred)
-
-  def _get_total_bytes_sent(self):
-    return self._total_bytes_sent
-
-  def _set_total_bytes_sent(self, bytes_sent):
-    self._total_bytes_sent = bytes_sent
-
-  total_bytes_sent = property(_get_total_bytes_sent, _set_total_bytes_sent)
-
-  def startProducing(self, consumer):
-    if self.started:
-      return
-
-    self.started = True
-    self._consumer = consumer
-    # Keep writing data to the consumer until we're finished,
-    # paused (pauseProducing()) or stopped (stopProducing())
-    while self.length - self.total_bytes_sent > 0 and \
-      not self.paused and not self.stopped:
-      self.write()
-    # self.write will fire this deferred once it has written
-    # the entire message to the consumer
-    return self.deferred
-
-  def resumeProducing(self):
-    self.paused = False
-    self.write(self)
-
-  def pauseProducing(self):
-    self.paused = True
-
-  def stopProducing(self):
-    self.stopped = True
-
-  def write(self):
-    if self.length - self.total_bytes_sent > ipc.BUFFER_SIZE:
-      buffer_length = ipc.BUFFER_SIZE
-    else:
-      buffer_length = self.length - self.total_bytes_sent
-    self.write_buffer(self.message[self.total_bytes_sent:
-                              (self.total_bytes_sent + buffer_length)])
-    self.total_bytes_sent += buffer_length
-    # Make sure we wrote the entire message
-    if self.total_bytes_sent == self.length and not self.stopped:
-      self.stopProducing()
-      # A message is always terminated by a zero-length buffer.
-      self.write_buffer_length(0)
-      self.deferred.callback(None)
-
-  def write_buffer(self, chunk):
-    buffer_length = len(chunk)
-    self.write_buffer_length(buffer_length)
-    self.consumer.write(chunk)
-
-  def write_buffer_length(self, n):
-    self.consumer.write(ipc.BIG_ENDIAN_INT_STRUCT.pack(n))
-
-class AvroProtocol(Protocol):
-
-  recvd = ''
-  done = False
-
-  def __init__(self, finished):
-    self.finished = finished
-    self.message = []
-
-  def dataReceived(self, data):
-    self.recvd = self.recvd + data
-    while len(self.recvd) >= ipc.BUFFER_HEADER_LENGTH:
-      buffer_length ,= ipc.BIG_ENDIAN_INT_STRUCT.unpack(
-        self.recvd[:ipc.BUFFER_HEADER_LENGTH])
-      if buffer_length == 0:
-        response = ''.join(self.message)
-        self.done = True
-        self.finished.callback(response)
-        break
-      if len(self.recvd) < buffer_length + ipc.BUFFER_HEADER_LENGTH:
-        break
-      buffer = self.recvd[ipc.BUFFER_HEADER_LENGTH:buffer_length + 
ipc.BUFFER_HEADER_LENGTH]
-      self.recvd = self.recvd[buffer_length + ipc.BUFFER_HEADER_LENGTH:]
-      self.message.append(buffer)
-
-  def connectionLost(self, reason):
-    if not self.done:
-      self.finished.errback(ipc.ConnectionClosedException("Reader read 0 
bytes."))
-
-class TwistedHTTPTransceiver(object):
-  """This transceiver uses the Agent class present in Twisted.web >= 9.0
-     for issuing requests to the remote endpoint."""
-  def __init__(self, host, port, remote_name=None, reactor=None):
-    self.url = "http://%s:%d/"; % (host, port)
-
-    if remote_name is None:
-      # There's no easy way to get this peer's remote address
-      # in Twisted so I use a random UUID to identify ourselves
-      import uuid
-      self.remote_name = uuid.uuid4()
-
-    if reactor is None:
-      from twisted.internet import reactor
-    self.agent = Agent(reactor)
-
-  def read_framed_message(self, response):
-    finished = Deferred()
-    response.deliverBody(AvroProtocol(finished))
-    return finished
-
-  def transceive(self, request):
-    req_method = 'POST'
-    req_headers = {
-      'Content-Type': ['avro/binary'],
-      'Accept-Encoding': ['identity'],
-    }
-
-    body_producer = RequestStreamingProducer(request)
-    d = self.agent.request(
-      req_method,
-      self.url,
-      headers=Headers(req_headers),
-      bodyProducer=body_producer)
-    return d.addCallback(self.read_framed_message)
-
-class AvroResponderResource(resource.Resource):
-  """This Twisted.web resource can be placed anywhere in a URL hierarchy
-     to provide an Avro endpoint. Different Avro protocols can be served
-     by the same web server as long as they are in different resources in
-     a URL hierarchy."""
-  isLeaf = True
-
-  def __init__(self, responder):
-    resource.Resource.__init__(self)
-    self.responder = responder
-
-  def cb_render_POST(self, resp_body, request):
-    request.setResponseCode(200)
-    request.setHeader('Content-Type', 'avro/binary')
-    resp_writer = ipc.FramedWriter(request)
-    resp_writer.write_framed_message(resp_body)
-    request.finish()
-
-  def render_POST(self, request):
-    # Unfortunately, Twisted.web doesn't support incoming
-    # streamed input yet, the whole payload must be kept in-memory
-    request.content.seek(0, 0)
-    call_request_reader = ipc.FramedReader(request.content)
-    call_request = call_request_reader.read_framed_message()
-    d = maybeDeferred(self.responder.respond, call_request)
-    d.addCallback(self.cb_render_POST, request)
-    return server.NOT_DONE_YET

Reply via email to