This is an automated email from the ASF dual-hosted git repository.
kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new caa3a42 AVRO-2920 Remove Python2 Polyfills (#948)
caa3a42 is described below
commit caa3a4265bf0a8e6496b63938bc4ab0b6620683c
Author: Michael A. Smith <[email protected]>
AuthorDate: Thu Aug 27 19:45:23 2020 -0400
AVRO-2920 Remove Python2 Polyfills (#948)
* AVRO-2920: Remove Python2 Imports
* AVRO-2920: Remove Static-Only Class
* AVRO-2920: Remove (object) from Class Definitions
* AVRO-2920: Remove Python 2 `next` Definition
* AVRO-2920: Remove Unicode/Str/Bytes/Basestring Polyfills
* AVRO-2920: Remove int/long Polyfills
* AVRO-2920: Remove Import Polyfills
* AVRO-2920: Use abc.abstractmethod Enforcement
---
lang/py/avro/__init__.py | 2 -
lang/py/avro/codecs.py | 74 +++++++++----------
lang/py/avro/constants.py | 1 -
lang/py/avro/datafile.py | 13 ++--
lang/py/avro/io.py | 106 ++++++++++++---------------
lang/py/avro/ipc.py | 56 ++++++--------
lang/py/avro/protocol.py | 22 ++----
lang/py/avro/schema.py | 79 +++++++++-----------
lang/py/avro/test/gen_interop_data.py | 21 ++----
lang/py/avro/test/mock_tether_parent.py | 24 +++---
lang/py/avro/test/sample_http_client.py | 11 ++-
lang/py/avro/test/sample_http_server.py | 2 -
lang/py/avro/test/test_bench.py | 2 -
lang/py/avro/test/test_datafile.py | 2 +-
lang/py/avro/test/test_datafile_interop.py | 11 ++-
lang/py/avro/test/test_io.py | 84 ++++++++++-----------
lang/py/avro/test/test_ipc.py | 10 +--
lang/py/avro/test/test_protocol.py | 16 +---
lang/py/avro/test/test_schema.py | 82 +++++++++------------
lang/py/avro/test/test_script.py | 101 ++++++++++++-------------
lang/py/avro/test/test_tether_task.py | 14 +---
lang/py/avro/test/test_tether_task_runner.py | 19 ++---
lang/py/avro/test/test_tether_word_count.py | 13 +---
lang/py/avro/test/word_count_task.py | 2 -
lang/py/avro/tether/__init__.py | 2 -
lang/py/avro/tether/tether_task.py | 45 ++++++------
lang/py/avro/tether/tether_task_runner.py | 26 +++----
lang/py/avro/tether/util.py | 2 -
lang/py/avro/timezones.py | 16 ++--
lang/py/avro/tool.py | 44 +++++------
lang/py/scripts/avro | 52 +++++--------
lang/py/setup.py | 2 -
32 files changed, 385 insertions(+), 571 deletions(-)
diff --git a/lang/py/avro/__init__.py b/lang/py/avro/__init__.py
index b328aad..165ced5 100644
--- a/lang/py/avro/__init__.py
+++ b/lang/py/avro/__init__.py
@@ -19,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import pkgutil
__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants',
'timezones', 'codecs']
diff --git a/lang/py/avro/codecs.py b/lang/py/avro/codecs.py
index d10d20c..4922e6d 100644
--- a/lang/py/avro/codecs.py
+++ b/lang/py/avro/codecs.py
@@ -28,15 +28,12 @@ so don't confuse it with the Python's "codecs", which is a
package mainly for
converting charsets (https://docs.python.org/3/library/codecs.html).
"""
-from __future__ import absolute_import, division, print_function
-
+import abc
+import binascii
import io
import struct
import sys
import zlib
-from abc import ABCMeta, abstractmethod
-from binascii import crc32
-from struct import Struct
import avro.errors
import avro.io
@@ -44,7 +41,7 @@ import avro.io
#
# Constants
#
-STRUCT_CRC32 = Struct('>I') # big-endian unsigned int
+STRUCT_CRC32 = struct.Struct('>I') # big-endian unsigned int
try:
@@ -64,11 +61,10 @@ except ImportError:
has_zstandard = False
-class Codec:
+class Codec(abc.ABC):
"""Abstract base class for all Avro codec classes."""
- __metaclass__ = ABCMeta
- @abstractmethod
+ @abc.abstractmethod
def compress(self, data):
"""Compress the passed data.
@@ -78,9 +74,8 @@ class Codec:
:rtype: tuple
:return: compressed data and its length
"""
- pass
- @abstractmethod
+ @abc.abstractmethod
def decompress(self, readers_decoder):
"""Read compressed data via the passed BinaryDecoder and decompress it.
@@ -92,7 +87,6 @@ class Codec:
:return: a newly instantiated BinaryDecoder object that contains the
decompressed data which is wrapped by a StringIO
"""
- pass
class NullCodec(Codec):
@@ -139,7 +133,7 @@ if has_snappy:
def compress(self, data):
compressed_data = snappy.compress(data)
# A 4-byte, big-endian CRC32 checksum
- compressed_data += STRUCT_CRC32.pack(crc32(data) & 0xffffffff)
+ compressed_data += STRUCT_CRC32.pack(binascii.crc32(data) &
0xffffffff)
return compressed_data, len(compressed_data)
def decompress(self, readers_decoder):
@@ -153,7 +147,7 @@ if has_snappy:
def check_crc32(self, bytes, checksum):
checksum = STRUCT_CRC32.unpack(checksum)[0]
- if crc32(bytes) & 0xffffffff != checksum:
+ if binascii.crc32(bytes) & 0xffffffff != checksum:
raise avro.errors.AvroException("Checksum failure")
@@ -177,30 +171,28 @@ if has_zstandard:
return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
-class Codecs(object):
- @staticmethod
- def get_codec(codec_name):
- codec_name = codec_name.lower()
- if codec_name == "null":
- return NullCodec()
- if codec_name == "deflate":
- return DeflateCodec()
- if codec_name == "bzip2" and has_bzip2:
- return BZip2Codec()
- if codec_name == "snappy" and has_snappy:
- return SnappyCodec()
- if codec_name == "zstandard" and has_zstandard:
- return ZstandardCodec()
- raise avro.errors.UnsupportedCodec("Unsupported codec: {}. (Is it
installed?)"
- .format(codec_name))
-
- @staticmethod
- def supported_codec_names():
- codec_names = ['null', 'deflate']
- if has_bzip2:
- codec_names.append('bzip2')
- if has_snappy:
- codec_names.append('snappy')
- if has_zstandard:
- codec_names.append('zstandard')
- return codec_names
+def get_codec(codec_name):
+ codec_name = codec_name.lower()
+ if codec_name == "null":
+ return NullCodec()
+ if codec_name == "deflate":
+ return DeflateCodec()
+ if codec_name == "bzip2" and has_bzip2:
+ return BZip2Codec()
+ if codec_name == "snappy" and has_snappy:
+ return SnappyCodec()
+ if codec_name == "zstandard" and has_zstandard:
+ return ZstandardCodec()
+ raise avro.errors.UnsupportedCodec("Unsupported codec: {}. (Is it
installed?)"
+ .format(codec_name))
+
+
+def supported_codec_names():
+ codec_names = ['null', 'deflate']
+ if has_bzip2:
+ codec_names.append('bzip2')
+ if has_snappy:
+ codec_names.append('snappy')
+ if has_zstandard:
+ codec_names.append('zstandard')
+ return codec_names
diff --git a/lang/py/avro/constants.py b/lang/py/avro/constants.py
index 202a498..3a6fbf6 100644
--- a/lang/py/avro/constants.py
+++ b/lang/py/avro/constants.py
@@ -21,7 +21,6 @@
"""Contains Constants for Python Avro"""
-from __future__ import absolute_import, division, print_function
DATE = "date"
DECIMAL = "decimal"
diff --git a/lang/py/avro/datafile.py b/lang/py/avro/datafile.py
index 05deab6..03125ee 100644
--- a/lang/py/avro/datafile.py
+++ b/lang/py/avro/datafile.py
@@ -21,17 +21,15 @@
"""Read/Write Avro File Object Containers."""
-from __future__ import absolute_import, division, print_function
-
import io
import os
import random
import zlib
+import avro.codecs
import avro.errors
import avro.io
import avro.schema
-from avro.codecs import Codecs
#
# Constants
@@ -50,7 +48,7 @@ META_SCHEMA = avro.schema.parse("""\
""" % (MAGIC_SIZE, SYNC_SIZE))
NULL_CODEC = 'null'
-VALID_CODECS = Codecs.supported_codec_names()
+VALID_CODECS = avro.codecs.supported_codec_names()
VALID_ENCODINGS = ['binary'] # not used yet
CODEC_KEY = "avro.codec"
@@ -61,7 +59,7 @@ SCHEMA_KEY = "avro.schema"
#
-class _DataFile(object):
+class _DataFile:
"""Mixin for methods common to both reading and writing."""
block_count = 0
@@ -195,7 +193,7 @@ class DataFileWriter(_DataFile):
# write block contents
uncompressed_data = self.buffer_writer.getvalue()
- codec = Codecs.get_codec(self.codec)
+ codec = avro.codecs.get_codec(self.codec)
compressed_data, compressed_data_length =
codec.compress(uncompressed_data)
# Write length of block
@@ -307,7 +305,7 @@ class DataFileReader(_DataFile):
def _read_block_header(self):
self.block_count = self.raw_decoder.read_long()
- codec = Codecs.get_codec(self.codec)
+ codec = avro.codecs.get_codec(self.codec)
self._datum_decoder = codec.decompress(self.raw_decoder)
def _skip_sync(self):
@@ -331,7 +329,6 @@ class DataFileReader(_DataFile):
datum = self.datum_reader.read(self.datum_decoder)
self.block_count -= 1
return datum
- next = __next__
def close(self):
"""Close this reader."""
diff --git a/lang/py/avro/io.py b/lang/py/avro/io.py
index a476a7d..96459d4 100644
--- a/lang/py/avro/io.py
+++ b/lang/py/avro/io.py
@@ -34,10 +34,10 @@ uses the following mapping:
* Schema records are implemented as dict.
* Schema arrays are implemented as list.
* Schema maps are implemented as dict.
- * Schema strings are implemented as unicode.
- * Schema bytes are implemented as str.
+ * Schema strings are implemented as str.
+ * Schema bytes are implemented as bytes.
* Schema ints are implemented as int.
- * Schema longs are implemented as long.
+ * Schema longs are implemented as int.
* Schema floats are implemented as float.
* Schema doubles are implemented as float.
* Schema booleans are implemented as bool.
@@ -86,33 +86,15 @@ datum contained. This allows iteration over the child nodes
in that datum, if there are any.
"""
-from __future__ import absolute_import, division, print_function
-
+import collections
import datetime
+import decimal
import json
import struct
-from collections import deque, namedtuple
-from decimal import Decimal, getcontext
-from struct import Struct
+import avro.constants
import avro.errors
-from avro import constants, timezones
-
-try:
- unicode
-except NameError:
- unicode = str
-
-try:
- basestring # type: ignore
-except NameError:
- basestring = (bytes, unicode)
-
-try:
- long
-except NameError:
- long = int
-
+import avro.timezones
#
# Constants
@@ -120,11 +102,11 @@ except NameError:
# TODO(hammer): shouldn't ! be < for little-endian (according to spec?)
-STRUCT_FLOAT = Struct('<f') # big-endian float
-STRUCT_DOUBLE = Struct('<d') # big-endian double
-STRUCT_SIGNED_SHORT = Struct('>h') # big-endian signed short
-STRUCT_SIGNED_INT = Struct('>i') # big-endian signed int
-STRUCT_SIGNED_LONG = Struct('>q') # big-endian signed long
+STRUCT_FLOAT = struct.Struct('<f') # big-endian float
+STRUCT_DOUBLE = struct.Struct('<d') # big-endian double
+STRUCT_SIGNED_SHORT = struct.Struct('>h') # big-endian signed short
+STRUCT_SIGNED_INT = struct.Struct('>i') # big-endian signed int
+STRUCT_SIGNED_LONG = struct.Struct('>q') # big-endian signed long
#
@@ -132,7 +114,7 @@ STRUCT_SIGNED_LONG = Struct('>q') # big-endian signed
long
#
-ValidationNode = namedtuple("ValidationNode", ['schema', 'datum', 'name'])
+ValidationNode = collections.namedtuple("ValidationNode", ['schema', 'datum',
'name'])
def validate(expected_schema, datum, raise_on_error=False):
@@ -150,7 +132,7 @@ def validate(expected_schema, datum, raise_on_error=False):
:returns: True if datum is valid for expected_schema, False if not.
"""
# use a FIFO queue to process schema nodes breadth first.
- nodes = deque()
+ nodes = collections.deque()
nodes.append(ValidationNode(expected_schema, datum,
getattr(expected_schema, "name", None)))
while nodes:
@@ -235,7 +217,7 @@ _ITERATORS['error'] = _ITERATORS['request'] =
_ITERATORS['record']
# Decoder/Encoder
#
-class BinaryDecoder(object):
+class BinaryDecoder:
"""Read leaf values."""
def __init__(self, reader):
@@ -331,10 +313,12 @@ class BinaryDecoder(object):
unscaled_datum <<= 8
unscaled_datum += ord(datum[offset:1 + offset])
- original_prec = getcontext().prec
- getcontext().prec = precision
- scaled_datum = Decimal(unscaled_datum).scaleb(-scale)
- getcontext().prec = original_prec
+ original_prec = decimal.getcontext().prec
+ try:
+ decimal.getcontext().prec = precision
+ scaled_datum = decimal.Decimal(unscaled_datum).scaleb(-scale)
+ finally:
+ decimal.getcontext().prec = original_prec
return scaled_datum
def read_bytes(self):
@@ -348,7 +332,7 @@ class BinaryDecoder(object):
A string is encoded as a long followed by
that many bytes of UTF-8 encoded character data.
"""
- return unicode(self.read_bytes(), "utf-8")
+ return self.read_bytes().decode("utf-8")
def read_date_from_int(self):
"""
@@ -396,7 +380,7 @@ class BinaryDecoder(object):
"""
timestamp_millis = self.read_long()
timedelta = datetime.timedelta(microseconds=timestamp_millis * 1000)
- unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=timezones.utc)
+ unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=avro.timezones.utc)
return unix_epoch_datetime + timedelta
def read_timestamp_micros_from_long(self):
@@ -406,7 +390,7 @@ class BinaryDecoder(object):
"""
timestamp_micros = self.read_long()
timedelta = datetime.timedelta(microseconds=timestamp_micros)
- unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=timezones.utc)
+ unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=avro.timezones.utc)
return unix_epoch_datetime + timedelta
def skip_null(self):
@@ -439,7 +423,7 @@ class BinaryDecoder(object):
self.reader.seek(self.reader.tell() + n)
-class BinaryEncoder(object):
+class BinaryEncoder:
"""Write leaf values."""
def __init__(self, writer):
@@ -617,26 +601,26 @@ class BinaryEncoder(object):
Encode python datetime object as long.
It stores the number of milliseconds from midnight of unix epoch, 1
January 1970.
"""
- datum = datum.astimezone(tz=timezones.utc)
- timedelta = datum - datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=timezones.utc)
- milliseconds = self._timedelta_total_microseconds(timedelta) / 1000
- self.write_long(long(milliseconds))
+ datum = datum.astimezone(tz=avro.timezones.utc)
+ timedelta = datum - datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=avro.timezones.utc)
+ milliseconds = self._timedelta_total_microseconds(timedelta) // 1000
+ self.write_long(milliseconds)
def write_timestamp_micros_long(self, datum):
"""
Encode python datetime object as long.
It stores the number of microseconds from midnight of unix epoch, 1
January 1970.
"""
- datum = datum.astimezone(tz=timezones.utc)
- timedelta = datum - datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=timezones.utc)
+ datum = datum.astimezone(tz=avro.timezones.utc)
+ timedelta = datum - datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=avro.timezones.utc)
microseconds = self._timedelta_total_microseconds(timedelta)
- self.write_long(long(microseconds))
+ self.write_long(microseconds)
#
# DatumReader/Writer
#
-class DatumReader(object):
+class DatumReader:
"""Deserialize Avro-encoded data into a Python data structure."""
def __init__(self, writers_schema=None, readers_schema=None):
@@ -693,17 +677,17 @@ class DatumReader(object):
elif writers_schema.type == 'string':
return decoder.read_utf8()
elif writers_schema.type == 'int':
- if logical_type == constants.DATE:
+ if logical_type == avro.constants.DATE:
return decoder.read_date_from_int()
- if logical_type == constants.TIME_MILLIS:
+ if logical_type == avro.constants.TIME_MILLIS:
return decoder.read_time_millis_from_int()
return decoder.read_int()
elif writers_schema.type == 'long':
- if logical_type == constants.TIME_MICROS:
+ if logical_type == avro.constants.TIME_MICROS:
return decoder.read_time_micros_from_long()
- elif logical_type == constants.TIMESTAMP_MILLIS:
+ elif logical_type == avro.constants.TIMESTAMP_MILLIS:
return decoder.read_timestamp_millis_from_long()
- elif logical_type == constants.TIMESTAMP_MICROS:
+ elif logical_type == avro.constants.TIMESTAMP_MICROS:
return decoder.read_timestamp_micros_from_long()
else:
return decoder.read_long()
@@ -968,7 +952,7 @@ class DatumReader(object):
elif field_schema.type == 'int':
return int(default_value)
elif field_schema.type == 'long':
- return long(default_value)
+ return int(default_value)
elif field_schema.type in ['float', 'double']:
return float(default_value)
elif field_schema.type in ['enum', 'fixed', 'string', 'bytes']:
@@ -1001,7 +985,7 @@ class DatumReader(object):
raise avro.errors.AvroException(fail_msg)
-class DatumWriter(object):
+class DatumWriter:
"""DatumWriter for generic python objects."""
def __init__(self, writers_schema=None):
@@ -1027,18 +1011,18 @@ class DatumWriter(object):
elif writers_schema.type == 'string':
encoder.write_utf8(datum)
elif writers_schema.type == 'int':
- if logical_type == constants.DATE:
+ if logical_type == avro.constants.DATE:
encoder.write_date_int(datum)
- elif logical_type == constants.TIME_MILLIS:
+ elif logical_type == avro.constants.TIME_MILLIS:
encoder.write_time_millis_int(datum)
else:
encoder.write_int(datum)
elif writers_schema.type == 'long':
- if logical_type == constants.TIME_MICROS:
+ if logical_type == avro.constants.TIME_MICROS:
encoder.write_time_micros_long(datum)
- elif logical_type == constants.TIMESTAMP_MILLIS:
+ elif logical_type == avro.constants.TIMESTAMP_MILLIS:
encoder.write_timestamp_millis_long(datum)
- elif logical_type == constants.TIMESTAMP_MICROS:
+ elif logical_type == avro.constants.TIMESTAMP_MICROS:
encoder.write_timestamp_micros_long(datum)
else:
encoder.write_long(datum)
diff --git a/lang/py/avro/ipc.py b/lang/py/avro/ipc.py
index c2cc852..458a722 100644
--- a/lang/py/avro/ipc.py
+++ b/lang/py/avro/ipc.py
@@ -21,25 +21,15 @@
"""Support for inter-process calls."""
-from __future__ import absolute_import, division, print_function
-
+import http.client
import io
import os
-from struct import Struct
+import struct
import avro.errors
import avro.io
-from avro import protocol, schema
-
-try:
- import httplib # type: ignore
-except ImportError:
- from http import client as httplib # type: ignore
-
-try:
- unicode
-except NameError:
- unicode = str
+import avro.protocol
+import avro.schema
def _load(name):
@@ -51,25 +41,25 @@ def _load(name):
HANDSHAKE_REQUEST_SCHEMA_JSON = _load('HandshakeRequest.avsc')
HANDSHAKE_RESPONSE_SCHEMA_JSON = _load('HandshakeResponse.avsc')
-HANDSHAKE_REQUEST_SCHEMA = schema.parse(HANDSHAKE_REQUEST_SCHEMA_JSON)
-HANDSHAKE_RESPONSE_SCHEMA = schema.parse(HANDSHAKE_RESPONSE_SCHEMA_JSON)
+HANDSHAKE_REQUEST_SCHEMA = avro.schema.parse(HANDSHAKE_REQUEST_SCHEMA_JSON)
+HANDSHAKE_RESPONSE_SCHEMA = avro.schema.parse(HANDSHAKE_RESPONSE_SCHEMA_JSON)
HANDSHAKE_REQUESTOR_WRITER = avro.io.DatumWriter(HANDSHAKE_REQUEST_SCHEMA)
HANDSHAKE_REQUESTOR_READER = avro.io.DatumReader(HANDSHAKE_RESPONSE_SCHEMA)
HANDSHAKE_RESPONDER_WRITER = avro.io.DatumWriter(HANDSHAKE_RESPONSE_SCHEMA)
HANDSHAKE_RESPONDER_READER = avro.io.DatumReader(HANDSHAKE_REQUEST_SCHEMA)
-META_SCHEMA = schema.parse('{"type": "map", "values": "bytes"}')
+META_SCHEMA = avro.schema.parse('{"type": "map", "values": "bytes"}')
META_WRITER = avro.io.DatumWriter(META_SCHEMA)
META_READER = avro.io.DatumReader(META_SCHEMA)
-SYSTEM_ERROR_SCHEMA = schema.parse('["string"]')
+SYSTEM_ERROR_SCHEMA = avro.schema.parse('["string"]')
# protocol cache
REMOTE_HASHES = {}
REMOTE_PROTOCOLS = {}
-BIG_ENDIAN_INT_STRUCT = Struct('!I')
+BIG_ENDIAN_INT_STRUCT = struct.Struct('!I')
BUFFER_HEADER_LENGTH = 4
BUFFER_SIZE = 8192
@@ -78,7 +68,7 @@ BUFFER_SIZE = 8192
#
-class BaseRequestor(object):
+class BaseRequestor:
"""Base class for the client side of a protocol interaction."""
def __init__(self, local_protocol, transceiver):
@@ -133,7 +123,7 @@ class BaseRequestor(object):
request_datum['clientHash'] = local_hash
request_datum['serverHash'] = remote_hash
if self.send_protocol:
- request_datum['clientProtocol'] = unicode(self.local_protocol)
+ request_datum['clientProtocol'] = str(self.local_protocol)
HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder)
def write_call_request(self, message_name, request_datum, encoder):
@@ -170,7 +160,7 @@ class BaseRequestor(object):
elif match == 'CLIENT':
if self.send_protocol:
raise avro.errors.AvroException('Handshake failure.')
- self.remote_protocol = protocol.parse(
+ self.remote_protocol = avro.protocol.parse(
handshake_response.get('serverProtocol'))
self.remote_hash = handshake_response.get('serverHash')
self.send_protocol = False
@@ -178,7 +168,7 @@ class BaseRequestor(object):
elif match == 'NONE':
if self.send_protocol:
raise avro.errors.AvroException('Handshake failure.')
- self.remote_protocol = protocol.parse(
+ self.remote_protocol = avro.protocol.parse(
handshake_response.get('serverProtocol'))
self.remote_hash = handshake_response.get('serverHash')
self.send_protocol = True
@@ -239,7 +229,7 @@ class Requestor(BaseRequestor):
return self.request(message_name, request_datum)
-class Responder(object):
+class Responder:
"""Base class for the server side of a protocol interaction."""
def __init__(self, local_protocol):
@@ -303,7 +293,7 @@ class Responder(object):
except AvroRemoteException as e:
error = e
except Exception as e:
- error = AvroRemoteException(unicode(e))
+ error = AvroRemoteException(str(e))
# write response using local protocol
META_WRITER.write(response_metadata, buffer_encoder)
@@ -315,7 +305,7 @@ class Responder(object):
writers_schema = local_message.errors
self.write_error(writers_schema, error, buffer_encoder)
except schema.AvroException as e:
- error = AvroRemoteException(unicode(e))
+ error = AvroRemoteException(str(e))
buffer_encoder = avro.io.BinaryEncoder(io.BytesIO())
META_WRITER.write(response_metadata, buffer_encoder)
buffer_encoder.write_boolean(True)
@@ -331,7 +321,7 @@ class Responder(object):
client_protocol = handshake_request.get('clientProtocol')
remote_protocol = self.get_protocol_cache(client_hash)
if remote_protocol is None and client_protocol is not None:
- remote_protocol = protocol.parse(client_protocol)
+ remote_protocol = avro.protocol.parse(client_protocol)
self.set_protocol_cache(client_hash, remote_protocol)
# evaluate remote's guess of the local protocol
@@ -348,7 +338,7 @@ class Responder(object):
handshake_response['match'] = 'CLIENT'
if handshake_response['match'] != 'BOTH':
- handshake_response['serverProtocol'] = unicode(self.local_protocol)
+ handshake_response['serverProtocol'] = str(self.local_protocol)
handshake_response['serverHash'] = self.local_hash
HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
@@ -370,14 +360,14 @@ class Responder(object):
def write_error(self, writers_schema, error_exception, encoder):
datum_writer = avro.io.DatumWriter(writers_schema)
- datum_writer.write(unicode(error_exception), encoder)
+ datum_writer.write(str(error_exception), encoder)
#
# Utility classes
#
-class FramedReader(object):
+class FramedReader:
"""Wrapper around a file-like object to read framed data."""
def __init__(self, reader):
@@ -407,7 +397,7 @@ class FramedReader(object):
return BIG_ENDIAN_INT_STRUCT.unpack(read)[0]
-class FramedWriter(object):
+class FramedWriter:
"""Wrapper around a file-like object to write framed data."""
def __init__(self, writer):
@@ -443,7 +433,7 @@ class FramedWriter(object):
#
-class HTTPTransceiver(object):
+class HTTPTransceiver:
"""
A simple HTTP-based transceiver implementation.
Useful for clients but not for servers
@@ -451,7 +441,7 @@ class HTTPTransceiver(object):
def __init__(self, host, port, req_resource='/'):
self.req_resource = req_resource
- self.conn = httplib.HTTPConnection(host, port)
+ self.conn = http.client.HTTPConnection(host, port)
self.conn.connect()
self.remote_name = self.conn.sock.getsockname()
diff --git a/lang/py/avro/protocol.py b/lang/py/avro/protocol.py
index 279735c..483f63e 100644
--- a/lang/py/avro/protocol.py
+++ b/lang/py/avro/protocol.py
@@ -21,24 +21,12 @@
"""Protocol implementation."""
-from __future__ import absolute_import, division, print_function
-
import hashlib
import json
import avro.errors
import avro.schema
-try:
- unicode
-except NameError:
- unicode = str
-
-try:
- basestring # type: ignore
-except NameError:
- basestring = (bytes, unicode)
-
#
# Constants
#
@@ -51,7 +39,7 @@ VALID_TYPE_SCHEMA_TYPES = ('enum', 'record', 'error', 'fixed')
#
-class Protocol(object):
+class Protocol:
"""An application protocol."""
def _parse_types(self, types, type_names):
@@ -85,10 +73,10 @@ class Protocol(object):
if not name:
fail_msg = 'Protocols must have a non-empty name.'
raise avro.errors.ProtocolParseException(fail_msg)
- elif not isinstance(name, basestring):
+ elif not isinstance(name, str):
fail_msg = 'The name property must be a string.'
raise avro.errors.ProtocolParseException(fail_msg)
- elif not (namespace is None or isinstance(namespace, basestring)):
+ elif not (namespace is None or isinstance(namespace, str)):
fail_msg = 'The namespace property must be a string.'
raise avro.errors.ProtocolParseException(fail_msg)
elif not (types is None or isinstance(types, list)):
@@ -173,7 +161,7 @@ class Protocol(object):
return to_cmp == json.loads(str(that))
-class Message(object):
+class Message:
"""A Protocol message."""
def _parse_request(self, request, names):
@@ -183,7 +171,7 @@ class Message(object):
return avro.schema.RecordSchema(None, None, request, names, 'request')
def _parse_response(self, response, names):
- if isinstance(response, basestring) and names.has_name(response, None):
+ if isinstance(response, str) and names.has_name(response, None):
return names.get_name(response, None)
else:
return avro.schema.make_avsc_object(response, names)
diff --git a/lang/py/avro/schema.py b/lang/py/avro/schema.py
index 5aa9977..2795268 100644
--- a/lang/py/avro/schema.py
+++ b/lang/py/avro/schema.py
@@ -40,28 +40,17 @@ A schema may be one of:
Null.
"""
-from __future__ import absolute_import, division, print_function
-
+import abc
import datetime
+import decimal
import json
import math
import re
import sys
import warnings
-from decimal import Decimal
+import avro.constants
import avro.errors
-from avro import constants
-
-try:
- unicode
-except NameError:
- unicode = str
-
-try:
- basestring # type: ignore
-except NameError:
- basestring = (bytes, unicode)
#
# Constants
@@ -147,13 +136,13 @@ def _is_timezone_aware_datetime(dt):
# Base Classes
#
-class Schema(object):
+class Schema(abc.ABC):
"""Base class for all Schema classes."""
_props = None
def __init__(self, type, other_props=None):
# Ensure valid ctor args
- if not isinstance(type, basestring):
+ if not isinstance(type, str):
fail_msg = 'Schema type must be a string.'
raise avro.errors.SchemaParseException(fail_msg)
elif type not in VALID_TYPES:
@@ -185,13 +174,13 @@ class Schema(object):
"""
return all(getattr(self, prop) == getattr(other, prop) for prop in
props)
+ @abc.abstractmethod
def match(self, writer):
"""Return True if the current schema (as reader) matches the writer
schema.
@arg writer: the writer schema to match against.
@return bool
"""
- raise NotImplemented("Must be implemented by subclasses")
# utility functions to manipulate properties dict
def get_prop(self, key):
@@ -203,6 +192,7 @@ class Schema(object):
def __str__(self):
return json.dumps(self.to_json())
+ @abc.abstractmethod
def to_json(self, names):
"""
Converts the schema object into its AVRO specification representation.
@@ -211,7 +201,6 @@ class Schema(object):
be aware of not re-defining schemas that are already listed
in the parameter names.
"""
- raise NotImplemented("Must be implemented by subclasses.")
def validate(self, datum):
"""Returns the appropriate schema object if datum is valid for that
schema, else None.
@@ -226,7 +215,7 @@ class Schema(object):
raise Exception("Must be implemented by subclasses.")
-class Name(object):
+class Name:
"""Class to describe Avro name."""
_full = None
@@ -289,7 +278,7 @@ class Name(object):
return self.space
-class Names(object):
+class Names:
"""Track name set and default namespace during parsing."""
def __init__(self, default_namespace=None):
@@ -353,10 +342,10 @@ class NamedSchema(Schema):
if not name:
fail_msg = 'Named Schemas must have a non-empty name.'
raise avro.errors.SchemaParseException(fail_msg)
- elif not isinstance(name, basestring):
+ elif not isinstance(name, str):
fail_msg = 'The name property must be a string.'
raise avro.errors.SchemaParseException(fail_msg)
- elif namespace is not None and not isinstance(namespace, basestring):
+ elif namespace is not None and not isinstance(namespace, str):
fail_msg = 'The namespace property must be a string.'
raise avro.errors.SchemaParseException(fail_msg)
@@ -387,7 +376,7 @@ class NamedSchema(Schema):
#
-class LogicalSchema(object):
+class LogicalSchema:
def __init__(self, logical_type):
self.logical_type = logical_type
@@ -418,14 +407,14 @@ class DecimalLogicalSchema(LogicalSchema):
super(DecimalLogicalSchema, self).__init__('decimal')
-class Field(object):
+class Field:
def __init__(self, type, name, has_default, default=None,
order=None, names=None, doc=None, other_props=None):
# Ensure valid ctor args
if not name:
fail_msg = 'Fields must have a non-empty name.'
raise avro.errors.SchemaParseException(fail_msg)
- elif not isinstance(name, basestring):
+ elif not isinstance(name, str):
fail_msg = 'The name property must be a string.'
raise avro.errors.SchemaParseException(fail_msg)
elif order is not None and order not in VALID_FIELD_SORT_ORDERS:
@@ -437,7 +426,7 @@ class Field(object):
self._has_default = has_default
self._props.update(other_props or {})
- if (isinstance(type, basestring) and names is not None and
+ if (isinstance(type, str) and names is not None and
names.has_name(type, None)):
type_schema = names.get_name(type, None)
else:
@@ -501,7 +490,7 @@ class PrimitiveSchema(Schema):
_validators = {
'null': lambda x: x is None,
'boolean': lambda x: isinstance(x, bool),
- 'string': lambda x: isinstance(x, unicode),
+ 'string': lambda x: isinstance(x, str),
'bytes': lambda x: isinstance(x, bytes),
'int': lambda x: isinstance(x, int) and INT_MIN_VALUE <= x <=
INT_MAX_VALUE,
'long': lambda x: isinstance(x, int) and LONG_MIN_VALUE <= x <=
LONG_MAX_VALUE,
@@ -570,7 +559,7 @@ class BytesDecimalSchema(PrimitiveSchema,
DecimalLogicalSchema):
def validate(self, datum):
"""Return self if datum is a Decimal object, else None."""
- return self if isinstance(datum, Decimal) else None
+ return self if isinstance(datum, decimal.Decimal) else None
def __eq__(self, that):
return self.props == that.props
@@ -641,7 +630,7 @@ class FixedDecimalSchema(FixedSchema, DecimalLogicalSchema):
def validate(self, datum):
"""Return self if datum is a Decimal object, else None."""
- return self if isinstance(datum, Decimal) else None
+ return self if isinstance(datum, decimal.Decimal) else None
def __eq__(self, that):
return self.props == that.props
@@ -710,7 +699,7 @@ class ArraySchema(Schema):
Schema.__init__(self, 'array', other_props)
# Add class members
- if isinstance(items, basestring) and names.has_name(items, None):
+ if isinstance(items, str) and names.has_name(items, None):
items_schema = names.get_name(items, None)
else:
try:
@@ -755,7 +744,7 @@ class MapSchema(Schema):
Schema.__init__(self, 'map', other_props)
# Add class members
- if isinstance(values, basestring) and names.has_name(values, None):
+ if isinstance(values, str) and names.has_name(values, None):
values_schema = names.get_name(values, None)
else:
try:
@@ -787,7 +776,7 @@ class MapSchema(Schema):
def validate(self, datum):
"""Return self if datum is a valid representation of this schema, else
None."""
- return self if isinstance(datum, dict) and all(isinstance(key,
unicode) for key in datum) else None
+ return self if isinstance(datum, dict) and all(isinstance(key, str)
for key in datum) else None
def __eq__(self, that):
to_cmp = json.loads(str(self))
@@ -811,7 +800,7 @@ class UnionSchema(Schema):
# Add class members
schema_objects = []
for schema in schemas:
- if isinstance(schema, basestring) and names.has_name(schema, None):
+ if isinstance(schema, str) and names.has_name(schema, None):
new_schema = names.get_name(schema, None)
else:
try:
@@ -989,7 +978,7 @@ class RecordSchema(NamedSchema):
class DateSchema(LogicalSchema, PrimitiveSchema):
def __init__(self, other_props=None):
- LogicalSchema.__init__(self, constants.DATE)
+ LogicalSchema.__init__(self, avro.constants.DATE)
PrimitiveSchema.__init__(self, 'int', other_props)
def to_json(self, names=None):
@@ -1009,7 +998,7 @@ class DateSchema(LogicalSchema, PrimitiveSchema):
class TimeMillisSchema(LogicalSchema, PrimitiveSchema):
def __init__(self, other_props=None):
- LogicalSchema.__init__(self, constants.TIME_MILLIS)
+ LogicalSchema.__init__(self, avro.constants.TIME_MILLIS)
PrimitiveSchema.__init__(self, 'int', other_props)
def to_json(self, names=None):
@@ -1029,7 +1018,7 @@ class TimeMillisSchema(LogicalSchema, PrimitiveSchema):
class TimeMicrosSchema(LogicalSchema, PrimitiveSchema):
def __init__(self, other_props=None):
- LogicalSchema.__init__(self, constants.TIME_MICROS)
+ LogicalSchema.__init__(self, avro.constants.TIME_MICROS)
PrimitiveSchema.__init__(self, 'long', other_props)
def to_json(self, names=None):
@@ -1049,7 +1038,7 @@ class TimeMicrosSchema(LogicalSchema, PrimitiveSchema):
class TimestampMillisSchema(LogicalSchema, PrimitiveSchema):
def __init__(self, other_props=None):
- LogicalSchema.__init__(self, constants.TIMESTAMP_MILLIS)
+ LogicalSchema.__init__(self, avro.constants.TIMESTAMP_MILLIS)
PrimitiveSchema.__init__(self, 'long', other_props)
def to_json(self, names=None):
@@ -1068,7 +1057,7 @@ class TimestampMillisSchema(LogicalSchema,
PrimitiveSchema):
class TimestampMicrosSchema(LogicalSchema, PrimitiveSchema):
def __init__(self, other_props=None):
- LogicalSchema.__init__(self, constants.TIMESTAMP_MICROS)
+ LogicalSchema.__init__(self, avro.constants.TIMESTAMP_MICROS)
PrimitiveSchema.__init__(self, 'long', other_props)
def to_json(self, names=None):
@@ -1102,14 +1091,14 @@ def make_bytes_decimal_schema(other_props):
def make_logical_schema(logical_type, type_, other_props):
"""Map the logical types to the appropriate literal type and schema
class."""
logical_types = {
- (constants.DATE, 'int'): DateSchema,
- (constants.DECIMAL, 'bytes'): make_bytes_decimal_schema,
+ (avro.constants.DATE, 'int'): DateSchema,
+ (avro.constants.DECIMAL, 'bytes'): make_bytes_decimal_schema,
# The fixed decimal schema is handled later by returning None now.
- (constants.DECIMAL, 'fixed'): lambda x: None,
- (constants.TIMESTAMP_MICROS, 'long'): TimestampMicrosSchema,
- (constants.TIMESTAMP_MILLIS, 'long'): TimestampMillisSchema,
- (constants.TIME_MICROS, 'long'): TimeMicrosSchema,
- (constants.TIME_MILLIS, 'int'): TimeMillisSchema,
+ (avro.constants.DECIMAL, 'fixed'): lambda x: None,
+ (avro.constants.TIMESTAMP_MICROS, 'long'): TimestampMicrosSchema,
+ (avro.constants.TIMESTAMP_MILLIS, 'long'): TimestampMillisSchema,
+ (avro.constants.TIME_MICROS, 'long'): TimeMicrosSchema,
+ (avro.constants.TIME_MILLIS, 'int'): TimeMillisSchema,
}
try:
schema_type = logical_types.get((logical_type, type_), None)
diff --git a/lang/py/avro/test/gen_interop_data.py
b/lang/py/avro/test/gen_interop_data.py
index 802c4b0..37d1fce 100644
--- a/lang/py/avro/test/gen_interop_data.py
+++ b/lang/py/avro/test/gen_interop_data.py
@@ -20,41 +20,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import os
import sys
+import avro.codecs
import avro.datafile
import avro.io
import avro.schema
-from avro.codecs import Codecs
-
-try:
- unicode
-except NameError:
- unicode = str
NULL_CODEC = 'null'
-CODECS_TO_VALIDATE = Codecs.supported_codec_names()
+CODECS_TO_VALIDATE = avro.codecs.supported_codec_names()
DATUM = {
'intField': 12,
'longField': 15234324,
- 'stringField': unicode('hey'),
+ 'stringField': 'hey',
'boolField': True,
'floatField': 1234.0,
'doubleField': -1234.0,
'bytesField': b'12312adf',
'nullField': None,
'arrayField': [5.0, 0.0, 12.0],
- 'mapField': {unicode('a'): {'label': unicode('a')},
- unicode('bee'): {'label': unicode('cee')}},
+ 'mapField': {'a': {'label': 'a'},
+ 'bee': {'label': 'cee'}},
'unionField': 12.0,
'enumField': 'C',
'fixedField': b'1019181716151413',
- 'recordField': {'label': unicode('blah'),
- 'children': [{'label': unicode('inner'), 'children': []}]},
+ 'recordField': {'label': 'blah',
+ 'children': [{'label': 'inner', 'children': []}]},
}
diff --git a/lang/py/avro/test/mock_tether_parent.py
b/lang/py/avro/test/mock_tether_parent.py
index 60dcaf1..6fa8c82 100644
--- a/lang/py/avro/test/mock_tether_parent.py
+++ b/lang/py/avro/test/mock_tether_parent.py
@@ -19,32 +19,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
+import http.server
import socket
import sys
import avro.errors
+import avro.ipc
+import avro.protocol
import avro.tether.tether_task
import avro.tether.util
-from avro import ipc, protocol
-
-try:
- import BaseHTTPServer as http_server # type: ignore
-except ImportError:
- from http import server as http_server # type: ignore
-
SERVER_ADDRESS = ('localhost', avro.tether.util.find_port())
-class MockParentResponder(ipc.Responder):
+class MockParentResponder(avro.ipc.Responder):
"""
The responder for the mocked parent
"""
def __init__(self):
- ipc.Responder.__init__(self, avro.tether.tether_task.outputProtocol)
+ avro.ipc.Responder.__init__(self,
avro.tether.tether_task.outputProtocol)
def invoke(self, message, request):
if message.name == 'configure':
@@ -63,19 +57,19 @@ class MockParentResponder(ipc.Responder):
return None
-class MockParentHandler(http_server.BaseHTTPRequestHandler):
+class MockParentHandler(http.server.BaseHTTPRequestHandler):
"""Create a handler for the parent.
"""
def do_POST(self):
self.responder = MockParentResponder()
- call_request_reader = ipc.FramedReader(self.rfile)
+ call_request_reader = avro.ipc.FramedReader(self.rfile)
call_request = call_request_reader.read_framed_message()
resp_body = self.responder.respond(call_request)
self.send_response(200)
self.send_header('Content-Type', 'avro/binary')
self.end_headers()
- resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer = avro.ipc.FramedWriter(self.wfile)
resp_writer.write_framed_message(resp_body)
@@ -95,6 +89,6 @@ if __name__ == '__main__':
# flush the output so it shows up in the parent process
sys.stdout.flush()
- parent_server = http_server.HTTPServer(SERVER_ADDRESS,
MockParentHandler)
+ parent_server = http.server.HTTPServer(SERVER_ADDRESS,
MockParentHandler)
parent_server.allow_reuse_address = True
parent_server.serve_forever()
diff --git a/lang/py/avro/test/sample_http_client.py
b/lang/py/avro/test/sample_http_client.py
index 6bc6902..1b9ab6f 100644
--- a/lang/py/avro/test/sample_http_client.py
+++ b/lang/py/avro/test/sample_http_client.py
@@ -20,12 +20,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import sys
import avro.errors
-from avro import ipc, protocol
+import avro.ipc
+import avro.protocol
MAIL_PROTOCOL_JSON = """\
{"namespace": "example.proto",
@@ -53,14 +52,14 @@ MAIL_PROTOCOL_JSON = """\
}
}
"""
-MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+MAIL_PROTOCOL = avro.protocol.parse(MAIL_PROTOCOL_JSON)
SERVER_HOST = 'localhost'
SERVER_PORT = 9090
def make_requestor(server_host, server_port, protocol):
- client = ipc.HTTPTransceiver(SERVER_HOST, SERVER_PORT)
- return ipc.Requestor(protocol, client)
+ client = avro.ipc.HTTPTransceiver(SERVER_HOST, SERVER_PORT)
+ return avro.ipc.Requestor(protocol, client)
if __name__ == '__main__':
diff --git a/lang/py/avro/test/sample_http_server.py
b/lang/py/avro/test/sample_http_server.py
index 2778173..dea1309 100644
--- a/lang/py/avro/test/sample_http_server.py
+++ b/lang/py/avro/test/sample_http_server.py
@@ -19,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import avro.ipc
import avro.protocol
diff --git a/lang/py/avro/test/test_bench.py b/lang/py/avro/test/test_bench.py
index e0be371..302e167 100644
--- a/lang/py/avro/test/test_bench.py
+++ b/lang/py/avro/test/test_bench.py
@@ -17,8 +17,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import argparse
import json
import random
diff --git a/lang/py/avro/test/test_datafile.py
b/lang/py/avro/test/test_datafile.py
index 751edbb..e720eda 100644
--- a/lang/py/avro/test/test_datafile.py
+++ b/lang/py/avro/test/test_datafile.py
@@ -30,7 +30,7 @@ import avro.datafile
import avro.io
import avro.schema
-CODECS_TO_VALIDATE = avro.codecs.Codecs.supported_codec_names()
+CODECS_TO_VALIDATE = avro.codecs.supported_codec_names()
TEST_PAIRS = tuple((avro.schema.parse(schema), datum) for schema, datum in (
('"null"', None),
('"boolean"', True),
diff --git a/lang/py/avro/test/test_datafile_interop.py
b/lang/py/avro/test/test_datafile_interop.py
index 76e8f5d..ec92ebd 100644
--- a/lang/py/avro/test/test_datafile_interop.py
+++ b/lang/py/avro/test/test_datafile_interop.py
@@ -19,13 +19,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import os
import unittest
import avro
-from avro import datafile, io
+import avro.datafile
+import avro.io
_INTEROP_DATA_DIR = os.path.join(os.path.dirname(avro.__file__), 'test',
'interop', 'data')
@@ -39,14 +38,14 @@ class TestDataFileInterop(unittest.TestCase):
filename = os.path.join(_INTEROP_DATA_DIR, f)
assert os.stat(filename).st_size > 0
base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1)
- if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+ if len(base_ext) < 2 or base_ext[1] in avro.datafile.VALID_CODECS:
print('READING %s' % f)
print()
# read data in binary from file
- datum_reader = io.DatumReader()
+ datum_reader = avro.io.DatumReader()
with open(filename, 'rb') as reader:
- dfr = datafile.DataFileReader(reader, datum_reader)
+ dfr = avro.datafile.DataFileReader(reader, datum_reader)
i = 0
for i, datum in enumerate(dfr, 1):
assert datum is not None
diff --git a/lang/py/avro/test/test_io.py b/lang/py/avro/test/test_io.py
index 464d2ad..750afa4 100644
--- a/lang/py/avro/test/test_io.py
+++ b/lang/py/avro/test/test_io.py
@@ -19,26 +19,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
+import binascii
import datetime
+import decimal
import io
import unittest
-from binascii import hexlify
-from decimal import Decimal
import avro.io
-from avro import schema, timezones
-
-try:
- unicode
-except NameError:
- unicode = str
+import avro.schema
+import avro.timezones
SCHEMAS_TO_VALIDATE = (
('"null"', None),
('"boolean"', True),
- ('"string"', unicode('adsfasdf09809dsf-=adsf')),
+ ('"string"', 'adsfasdf09809dsf-=adsf'),
('"bytes"', b'12345abcd'),
('"int"', 1234),
('"long"', 1234),
@@ -46,16 +40,16 @@ SCHEMAS_TO_VALIDATE = (
('"double"', 1234.0),
('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
('{"type": "fixed", "logicalType": "decimal", "name": "Test", "size": 8,
"precision": 5, "scale": 4}',
- Decimal('3.1415')),
+ decimal.Decimal('3.1415')),
('{"type": "fixed", "logicalType": "decimal", "name": "Test", "size": 8,
"precision": 5, "scale": 4}',
- Decimal('-3.1415')),
- ('{"type": "bytes", "logicalType": "decimal", "precision": 5, "scale":
4}', Decimal('3.1415')),
- ('{"type": "bytes", "logicalType": "decimal", "precision": 5, "scale":
4}', Decimal('-3.1415')),
+ decimal.Decimal('-3.1415')),
+ ('{"type": "bytes", "logicalType": "decimal", "precision": 5, "scale":
4}', decimal.Decimal('3.1415')),
+ ('{"type": "bytes", "logicalType": "decimal", "precision": 5, "scale":
4}', decimal.Decimal('-3.1415')),
('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
('{"type": "array", "items": "long"}', [1, 3, 2]),
- ('{"type": "map", "values": "long"}', {unicode('a'): 1,
- unicode('b'): 3,
- unicode('c'): 2}),
+ ('{"type": "map", "values": "long"}', {'a': 1,
+ 'b': 3,
+ 'c': 2}),
('["string", "null", "long"]', None),
('{"type": "int", "logicalType": "date"}', datetime.date(2000, 1, 1)),
('{"type": "int", "logicalType": "time-millis"}', datetime.time(23, 59,
59, 999000)),
@@ -64,27 +58,27 @@ SCHEMAS_TO_VALIDATE = (
('{"type": "long", "logicalType": "time-micros"}', datetime.time(0, 0, 0,
000000)),
(
'{"type": "long", "logicalType": "timestamp-millis"}',
- datetime.datetime(1000, 1, 1, 0, 0, 0, 000000, tzinfo=timezones.utc)
+ datetime.datetime(1000, 1, 1, 0, 0, 0, 000000,
tzinfo=avro.timezones.utc)
),
(
'{"type": "long", "logicalType": "timestamp-millis"}',
- datetime.datetime(9999, 12, 31, 23, 59, 59, 999000,
tzinfo=timezones.utc)
+ datetime.datetime(9999, 12, 31, 23, 59, 59, 999000,
tzinfo=avro.timezones.utc)
),
(
'{"type": "long", "logicalType": "timestamp-millis"}',
- datetime.datetime(2000, 1, 18, 2, 2, 1, 100000, tzinfo=timezones.tst)
+ datetime.datetime(2000, 1, 18, 2, 2, 1, 100000,
tzinfo=avro.timezones.tst)
),
(
'{"type": "long", "logicalType": "timestamp-micros"}',
- datetime.datetime(1000, 1, 1, 0, 0, 0, 000000, tzinfo=timezones.utc)
+ datetime.datetime(1000, 1, 1, 0, 0, 0, 000000,
tzinfo=avro.timezones.utc)
),
(
'{"type": "long", "logicalType": "timestamp-micros"}',
- datetime.datetime(9999, 12, 31, 23, 59, 59, 999999,
tzinfo=timezones.utc)
+ datetime.datetime(9999, 12, 31, 23, 59, 59, 999999,
tzinfo=avro.timezones.utc)
),
(
'{"type": "long", "logicalType": "timestamp-micros"}',
- datetime.datetime(2000, 1, 18, 2, 2, 1, 123499, tzinfo=timezones.tst)
+ datetime.datetime(2000, 1, 18, 2, 2, 1, 123499,
tzinfo=avro.timezones.tst)
),
('{"type": "string", "logicalType": "uuid"}', u'12345abcd'),
('{"type": "string", "logicalType": "unknown-logical-type"}',
u'12345abcd'),
@@ -103,7 +97,7 @@ SCHEMAS_TO_VALIDATE = (
"name": "Cons",
"fields": [{"name": "car", "type": "Lisp"},
{"name": "cdr", "type": "Lisp"}]}]}]}
- """, {'value': {'car': {'value': unicode('head')}, 'cdr': {'value':
None}}}),
+ """, {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
)
BINARY_ENCODINGS = (
@@ -130,14 +124,14 @@ DEFAULT_VALUE_EXAMPLES = (
('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"',
u'\xff\xff'),
('{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}', '"FOO"',
'FOO'),
('{"type": "array", "items": "int"}', '[1, 2, 3]', [1, 2, 3]),
- ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {unicode('a'): 1,
- unicode('b'):
2}),
+ ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {'a': 1,
+ 'b': 2}),
('["int", "null"]', '5', 5),
('{"type": "record", "name": "F", "fields": [{"name": "A", "type":
"int"}]}',
'{"A": 5}', {'A': 5}),
)
-LONG_RECORD_SCHEMA = schema.parse("""\
+LONG_RECORD_SCHEMA = avro.schema.parse("""\
{"type": "record",
"name": "Test",
"fields": [{"name": "A", "type": "int"},
@@ -155,10 +149,10 @@ def avro_hexlify(reader):
"""Return the hex value, as a string, of a binary-encoded int or long."""
b = []
current_byte = reader.read(1)
- b.append(hexlify(current_byte))
+ b.append(binascii.hexlify(current_byte))
while (ord(current_byte) & 0x80) != 0:
current_byte = reader.read(1)
- b.append(hexlify(current_byte))
+ b.append(binascii.hexlify(current_byte))
return b' '.join(b)
@@ -191,7 +185,7 @@ def check_binary_encoding(number_type):
print('Datum: %d' % datum)
print('Correct Encoding: %s' % hex_encoding)
- writers_schema = schema.parse('"%s"' % number_type.lower())
+ writers_schema = avro.schema.parse('"%s"' % number_type.lower())
writer, encoder, datum_writer = write_datum(datum, writers_schema)
writer.seek(0)
hex_val = avro_hexlify(writer)
@@ -211,7 +205,7 @@ def check_skip_number(number_type):
print('Value to Skip: %d' % value_to_skip)
# write the value to skip and a known value
- writers_schema = schema.parse('"%s"' % number_type.lower())
+ writers_schema = avro.schema.parse('"%s"' % number_type.lower())
writer, encoder, datum_writer = write_datum(value_to_skip,
writers_schema)
datum_writer.write(VALUE_TO_READ, encoder)
@@ -242,7 +236,7 @@ class TestIO(unittest.TestCase):
for example_schema, datum in SCHEMAS_TO_VALIDATE:
print('Schema: %s' % example_schema)
print('Datum: %s' % datum)
- validated = avro.io.validate(schema.parse(example_schema), datum)
+ validated = avro.io.validate(avro.schema.parse(example_schema),
datum)
print('Valid: %s' % validated)
if validated:
passed += 1
@@ -255,16 +249,16 @@ class TestIO(unittest.TestCase):
print('Schema: %s' % example_schema)
print('Datum: %s' % datum)
- writers_schema = schema.parse(example_schema)
+ writers_schema = avro.schema.parse(example_schema)
writer, encoder, datum_writer = write_datum(datum, writers_schema)
round_trip_datum = read_datum(writer, writers_schema)
print('Round Trip Datum: %s' % round_trip_datum)
- if isinstance(round_trip_datum, Decimal):
+ if isinstance(round_trip_datum, decimal.Decimal):
round_trip_datum = round_trip_datum.to_eng_string()
datum = str(datum)
elif isinstance(round_trip_datum, datetime.datetime):
- datum = datum.astimezone(tz=timezones.utc)
+ datum = datum.astimezone(tz=avro.timezones.utc)
if datum == round_trip_datum:
correct += 1
self.assertEqual(correct, len(SCHEMAS_TO_VALIDATE))
@@ -300,10 +294,10 @@ class TestIO(unittest.TestCase):
promotable_schemas = ['"int"', '"long"', '"float"', '"double"']
incorrect = 0
for i, ws in enumerate(promotable_schemas):
- writers_schema = schema.parse(ws)
+ writers_schema = avro.schema.parse(ws)
datum_to_write = 219
for rs in promotable_schemas[i + 1:]:
- readers_schema = schema.parse(rs)
+ readers_schema = avro.schema.parse(rs)
writer, enc, dw = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
print('Writer: %s Reader: %s' % (writers_schema,
readers_schema))
@@ -314,12 +308,12 @@ class TestIO(unittest.TestCase):
def test_unknown_symbol(self):
print_test_name('TEST UNKNOWN SYMBOL')
- writers_schema = schema.parse("""\
+ writers_schema = avro.schema.parse("""\
{"type": "enum", "name": "Test",
"symbols": ["FOO", "BAR"]}""")
datum_to_write = 'FOO'
- readers_schema = schema.parse("""\
+ readers_schema = avro.schema.parse("""\
{"type": "enum", "name": "Test",
"symbols": ["BAR", "BAZ"]}""")
@@ -336,7 +330,7 @@ class TestIO(unittest.TestCase):
correct = 0
for field_type, default_json, default_datum in DEFAULT_VALUE_EXAMPLES:
- readers_schema = schema.parse("""\
+ readers_schema = avro.schema.parse("""\
{"type": "record", "name": "Test",
"fields": [{"name": "H", "type": %s, "default": %s}]}
""" % (field_type, default_json))
@@ -354,7 +348,7 @@ class TestIO(unittest.TestCase):
writers_schema = LONG_RECORD_SCHEMA
datum_to_write = LONG_RECORD_DATUM
- readers_schema = schema.parse("""\
+ readers_schema = avro.schema.parse("""\
{"type": "record", "name": "Test",
"fields": [{"name": "H", "type": "int"}]}""")
@@ -369,7 +363,7 @@ class TestIO(unittest.TestCase):
writers_schema = LONG_RECORD_SCHEMA
datum_to_write = LONG_RECORD_DATUM
- readers_schema = schema.parse("""\
+ readers_schema = avro.schema.parse("""\
{"type": "record", "name": "Test",
"fields": [{"name": "E", "type": "int"},
{"name": "F", "type": "int"}]}""")
@@ -385,7 +379,7 @@ class TestIO(unittest.TestCase):
writers_schema = LONG_RECORD_SCHEMA
datum_to_write = LONG_RECORD_DATUM
- readers_schema = schema.parse("""\
+ readers_schema = avro.schema.parse("""\
{"type": "record", "name": "Test",
"fields": [{"name": "F", "type": "int"},
{"name": "E", "type": "int"}]}""")
@@ -398,7 +392,7 @@ class TestIO(unittest.TestCase):
def test_type_exception(self):
print_test_name('TEST TYPE EXCEPTION')
- writers_schema = schema.parse("""\
+ writers_schema = avro.schema.parse("""\
{"type": "record", "name": "Test",
"fields": [{"name": "F", "type": "int"},
{"name": "E", "type": "int"}]}""")
diff --git a/lang/py/avro/test/test_ipc.py b/lang/py/avro/test/test_ipc.py
index 4ae6bd7..0957d49 100644
--- a/lang/py/avro/test/test_ipc.py
+++ b/lang/py/avro/test/test_ipc.py
@@ -24,13 +24,9 @@ There are currently no IPC tests within python, in part
because there are no
servers yet available.
"""
-from __future__ import absolute_import, division, print_function
-
import unittest
-# This test does import this code, to make sure it at least passes
-# compilation.
-from avro import ipc
+import avro.ipc
class TestIPC(unittest.TestCase):
@@ -38,10 +34,10 @@ class TestIPC(unittest.TestCase):
pass
def test_server_with_path(self):
- client_with_custom_path = ipc.HTTPTransceiver('apache.org', 80,
'/service/article')
+ client_with_custom_path = avro.ipc.HTTPTransceiver('apache.org', 80,
'/service/article')
self.assertEqual('/service/article',
client_with_custom_path.req_resource)
- client_with_default_path = ipc.HTTPTransceiver('apache.org', 80)
+ client_with_default_path = avro.ipc.HTTPTransceiver('apache.org', 80)
self.assertEqual('/', client_with_default_path.req_resource)
diff --git a/lang/py/avro/test/test_protocol.py
b/lang/py/avro/test/test_protocol.py
index 0916a9b..57118c3 100644
--- a/lang/py/avro/test/test_protocol.py
+++ b/lang/py/avro/test/test_protocol.py
@@ -21,8 +21,6 @@
"""Test the protocol parsing logic."""
-from __future__ import absolute_import, division, print_function
-
import json
import unittest
@@ -30,22 +28,12 @@ import avro.errors
import avro.protocol
import avro.schema
-try:
- unicode
-except NameError:
- unicode = str
-
-try:
- basestring # type: ignore
-except NameError:
- basestring = (bytes, unicode)
-
-class TestProtocol(object):
+class TestProtocol:
"""A proxy for a protocol string that provides useful test metadata."""
def __init__(self, data, name='', comment=''):
- if not isinstance(data, basestring):
+ if not isinstance(data, str):
data = json.dumps(data)
self.data = data
self.name = name or data
diff --git a/lang/py/avro/test/test_schema.py b/lang/py/avro/test/test_schema.py
index 6988801..7ce5a1c 100644
--- a/lang/py/avro/test/test_schema.py
+++ b/lang/py/avro/test/test_schema.py
@@ -21,36 +21,20 @@
"""Test the schema parsing logic."""
-from __future__ import absolute_import, division, print_function
-
import json
import unittest
import warnings
+from typing import List
import avro.errors
-from avro import schema
-
-try:
- unicode
-except NameError:
- unicode = str
-
-try:
- basestring # type: ignore
-except NameError:
- basestring = (bytes, unicode)
-
-try:
- from typing import List
-except ImportError:
- pass
+import avro.schema
-class TestSchema(object):
+class TestSchema:
"""A proxy for a schema string that provides useful test metadata."""
def __init__(self, data, name='', comment='', warnings=None):
- if not isinstance(data, basestring):
+ if not isinstance(data, str):
data = json.dumps(data)
self.data = data
self.name = name or data # default to data for name
@@ -58,7 +42,7 @@ class TestSchema(object):
self.warnings = warnings
def parse(self):
- return schema.parse(str(self))
+ return avro.schema.parse(str(self))
def __str__(self):
return str(self.data)
@@ -78,8 +62,8 @@ PRIMITIVE_EXAMPLES = [InvalidTestSchema('"True"')] # type:
List[TestSchema]
PRIMITIVE_EXAMPLES.append(InvalidTestSchema('True'))
PRIMITIVE_EXAMPLES.append(InvalidTestSchema('{"no_type": "test"}'))
PRIMITIVE_EXAMPLES.append(InvalidTestSchema('{"type": "panther"}'))
-PRIMITIVE_EXAMPLES.extend([ValidTestSchema('"{}"'.format(t)) for t in
schema.PRIMITIVE_TYPES])
-PRIMITIVE_EXAMPLES.extend([ValidTestSchema({"type": t}) for t in
schema.PRIMITIVE_TYPES])
+PRIMITIVE_EXAMPLES.extend([ValidTestSchema('"{}"'.format(t)) for t in
avro.schema.PRIMITIVE_TYPES])
+PRIMITIVE_EXAMPLES.extend([ValidTestSchema({"type": t}) for t in
avro.schema.PRIMITIVE_TYPES])
FIXED_EXAMPLES = [
ValidTestSchema({"type": "fixed", "name": "Test", "size": 1}),
@@ -330,7 +314,7 @@ class TestMisc(unittest.TestCase):
def test_correct_recursive_extraction(self):
"""A recursive reference within a schema should be the same type every
time."""
- s = schema.parse('''{
+ s = avro.schema.parse('''{
"type": "record",
"name": "X",
"fields": [{
@@ -341,64 +325,64 @@ class TestMisc(unittest.TestCase):
"fields": [{"name": "Z", "type": "X"}]}
}]
}''')
- t = schema.parse(str(s.fields[0].type))
+ t = avro.schema.parse(str(s.fields[0].type))
# If we've made it this far, the subschema was reasonably stringified;
it ccould be reparsed.
self.assertEqual("X", t.fields[0].type.name)
def test_name_is_none(self):
"""When a name is None its namespace is None."""
- self.assertIsNone(schema.Name(None, None, None).fullname)
- self.assertIsNone(schema.Name(None, None, None).space)
+ self.assertIsNone(avro.schema.Name(None, None, None).fullname)
+ self.assertIsNone(avro.schema.Name(None, None, None).space)
def test_name_not_empty_string(self):
"""A name cannot be the empty string."""
- self.assertRaises(avro.errors.SchemaParseException, schema.Name, "",
None, None)
+ self.assertRaises(avro.errors.SchemaParseException, avro.schema.Name,
"", None, None)
def test_name_space_specified(self):
"""Space combines with a name to become the fullname."""
# name and namespace specified
- fullname = schema.Name('a', 'o.a.h', None).fullname
+ fullname = avro.schema.Name('a', 'o.a.h', None).fullname
self.assertEqual(fullname, 'o.a.h.a')
def test_fullname_space_specified(self):
"""When name contains dots, namespace should be ignored."""
- fullname = schema.Name('a.b.c.d', 'o.a.h', None).fullname
+ fullname = avro.schema.Name('a.b.c.d', 'o.a.h', None).fullname
self.assertEqual(fullname, 'a.b.c.d')
def test_name_default_specified(self):
"""Default space becomes the namespace when the namespace is None."""
- fullname = schema.Name('a', None, 'b.c.d').fullname
+ fullname = avro.schema.Name('a', None, 'b.c.d').fullname
self.assertEqual(fullname, 'b.c.d.a')
def test_fullname_default_specified(self):
"""When a name contains dots, default space should be ignored."""
- fullname = schema.Name('a.b.c.d', None, 'o.a.h').fullname
+ fullname = avro.schema.Name('a.b.c.d', None, 'o.a.h').fullname
self.assertEqual(fullname, 'a.b.c.d')
def test_fullname_space_default_specified(self):
"""When a name contains dots, namespace and default space should be
ignored."""
- fullname = schema.Name('a.b.c.d', 'o.a.a', 'o.a.h').fullname
+ fullname = avro.schema.Name('a.b.c.d', 'o.a.a', 'o.a.h').fullname
self.assertEqual(fullname, 'a.b.c.d')
def test_name_space_default_specified(self):
"""When name and space are specified, default space should be
ignored."""
- fullname = schema.Name('a', 'o.a.a', 'o.a.h').fullname
+ fullname = avro.schema.Name('a', 'o.a.a', 'o.a.h').fullname
self.assertEqual(fullname, 'o.a.a.a')
def test_equal_names(self):
"""Equality of names is defined on the fullname and is
case-sensitive."""
- self.assertEqual(schema.Name('a.b.c.d', None, None), schema.Name('d',
'a.b.c', None))
- self.assertNotEqual(schema.Name('C.d', None, None), schema.Name('c.d',
None, None))
+ self.assertEqual(avro.schema.Name('a.b.c.d', None, None),
avro.schema.Name('d', 'a.b.c', None))
+ self.assertNotEqual(avro.schema.Name('C.d', None, None),
avro.schema.Name('c.d', None, None))
def test_invalid_name(self):
"""The name portion of a fullname, record field names, and enum
symbols must:
start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"""
- self.assertRaises(avro.errors.InvalidName, schema.Name, 'an especially
spacey cowboy', None, None)
- self.assertRaises(avro.errors.InvalidName, schema.Name, '99 problems
but a name aint one', None, None)
+ self.assertRaises(avro.errors.InvalidName, avro.schema.Name, 'an
especially spacey cowboy', None, None)
+ self.assertRaises(avro.errors.InvalidName, avro.schema.Name, '99
problems but a name aint one', None, None)
def test_null_namespace(self):
"""The empty string may be used as a namespace to indicate the null
namespace."""
- name = schema.Name('name', "", None)
+ name = avro.schema.Name('name', "", None)
self.assertEqual(name.fullname, "name")
self.assertIsNone(name.space)
@@ -406,7 +390,7 @@ class TestMisc(unittest.TestCase):
"""A specific exception message should appear on a json parse error."""
self.assertRaisesRegexp(avro.errors.SchemaParseException,
r'Error parsing JSON: /not/a/real/file',
- schema.parse,
+ avro.schema.parse,
'/not/a/real/file')
def test_decimal_valid_type(self):
@@ -443,8 +427,8 @@ class TestMisc(unittest.TestCase):
"size": 8})
fixed_decimal = fixed_decimal_schema.parse()
- self.assertIsInstance(fixed_decimal, schema.FixedSchema)
- self.assertIsInstance(fixed_decimal, schema.DecimalLogicalSchema)
+ self.assertIsInstance(fixed_decimal, avro.schema.FixedSchema)
+ self.assertIsInstance(fixed_decimal, avro.schema.DecimalLogicalSchema)
def test_fixed_decimal_invalid_max_precision(self):
# An 8 byte number can't represent every 19 digit number, so the
logical
@@ -458,8 +442,8 @@ class TestMisc(unittest.TestCase):
"size": 8})
fixed_decimal = fixed_decimal_schema.parse()
- self.assertIsInstance(fixed_decimal, schema.FixedSchema)
- self.assertNotIsInstance(fixed_decimal, schema.DecimalLogicalSchema)
+ self.assertIsInstance(fixed_decimal, avro.schema.FixedSchema)
+ self.assertNotIsInstance(fixed_decimal,
avro.schema.DecimalLogicalSchema)
def test_parse_invalid_symbol(self):
"""Disabling enumschema symbol validation should allow invalid symbols
to pass."""
@@ -467,7 +451,7 @@ class TestMisc(unittest.TestCase):
"type": "enum", "name": "AVRO2174", "symbols": ["white space"]})
try:
- case = schema.parse(test_schema_string, validate_enum_symbols=True)
+ case = avro.schema.parse(test_schema_string,
validate_enum_symbols=True)
except avro.errors.InvalidName:
pass
else:
@@ -475,7 +459,7 @@ class TestMisc(unittest.TestCase):
"an invalid symbol should raise InvalidName.")
try:
- case = schema.parse(test_schema_string,
validate_enum_symbols=False)
+ case = avro.schema.parse(test_schema_string,
validate_enum_symbols=False)
except avro.errors.InvalidName:
self.fail("When enum symbol validation is disabled, "
"an invalid symbol should not raise InvalidName.")
@@ -535,7 +519,7 @@ class RoundTripParseTestCase(unittest.TestCase):
def parse_round_trip(self):
"""The string of a Schema should be parseable to the same Schema."""
parsed = self.test_schema.parse()
- round_trip = schema.parse(str(parsed))
+ round_trip = avro.schema.parse(str(parsed))
self.assertEqual(parsed, round_trip)
@@ -569,7 +553,7 @@ class OtherAttributesTestCase(unittest.TestCase):
"cp_int": int,
"cp_null": type(None),
"cp_object": dict,
- "cp_string": basestring,
+ "cp_string": str,
}
def __init__(self, test_schema):
@@ -588,7 +572,7 @@ class OtherAttributesTestCase(unittest.TestCase):
def check_attributes(self):
"""Other attributes and their types on a schema should be preserved."""
sch = self.test_schema.parse()
- round_trip = schema.parse(str(sch))
+ round_trip = avro.schema.parse(str(sch))
self.assertEqual(sch.other_props, round_trip.other_props,
"Properties were not preserved in a round-trip
parse.")
self._check_props(sch.other_props)
diff --git a/lang/py/avro/test/test_script.py b/lang/py/avro/test/test_script.py
index 7f2d16e..633cdaf 100644
--- a/lang/py/avro/test/test_script.py
+++ b/lang/py/avro/test/test_script.py
@@ -19,27 +19,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import csv
import io
import json
+import operator
+import os
+import os.path
+import subprocess
import sys
+import tempfile
import unittest
-from operator import itemgetter
-from os import remove
-from os.path import dirname, isfile, join
-from subprocess import check_call, check_output
-from tempfile import NamedTemporaryFile
+import avro.datafile
+import avro.io
import avro.schema
-from avro.datafile import DataFileWriter
-from avro.io import DatumWriter
-
-try:
- unicode
-except NameError:
- unicode = str
NUM_RECORDS = 7
@@ -58,13 +51,13 @@ SCHEMA = '''
'''
LOONIES = (
- (unicode("daffy"), unicode("duck"), unicode("duck")),
- (unicode("bugs"), unicode("bunny"), unicode("bunny")),
- (unicode("tweety"), unicode(""), unicode("bird")),
- (unicode("road"), unicode("runner"), unicode("bird")),
- (unicode("wile"), unicode("e"), unicode("coyote")),
- (unicode("pepe"), unicode("le pew"), unicode("skunk")),
- (unicode("foghorn"), unicode("leghorn"), unicode("rooster")),
+ ("daffy", "duck", "duck"),
+ ("bugs", "bunny", "bunny"),
+ ("tweety", "", "bird"),
+ ("road", "runner", "bird"),
+ ("wile", "e", "coyote"),
+ ("pepe", "le pew", "skunk"),
+ ("foghorn", "leghorn", "rooster"),
)
@@ -73,7 +66,7 @@ def looney_records():
yield {"first": f, "last": l, "type": t}
-SCRIPT = join(dirname(dirname(dirname(__file__))), "scripts", "avro")
+SCRIPT =
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"scripts", "avro")
_JSON_PRETTY = '''{
"first": "daffy",
@@ -85,28 +78,30 @@ _JSON_PRETTY = '''{
def gen_avro(filename):
schema = avro.schema.parse(SCHEMA)
fo = open(filename, "wb")
- writer = DataFileWriter(fo, DatumWriter(), schema)
+ writer = avro.datafile.DataFileWriter(fo, avro.io.DatumWriter(), schema)
for record in looney_records():
writer.append(record)
writer.close()
fo.close()
-def tempfile():
- return NamedTemporaryFile(delete=False).name
+def _tempfile():
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ pass
+ return f.name
class TestCat(unittest.TestCase):
def setUp(self):
- self.avro_file = tempfile()
+ self.avro_file = _tempfile()
gen_avro(self.avro_file)
def tearDown(self):
- if isfile(self.avro_file):
- remove(self.avro_file)
+ if os.path.isfile(self.avro_file):
+ os.unlink(self.avro_file)
def _run(self, *args, **kw):
- out = check_output([sys.executable, SCRIPT, "cat", self.avro_file] +
list(args)).decode()
+ out = subprocess.check_output([sys.executable, SCRIPT, "cat",
self.avro_file] + list(args)).decode()
if kw.get("raw"):
return out
return out.splitlines()
@@ -126,7 +121,7 @@ class TestCat(unittest.TestCase):
assert len(list(reader)) == NUM_RECORDS
def test_csv_header(self):
- r = {"type": unicode("duck"), "last": unicode("duck"), "first":
unicode("daffy")}
+ r = {"type": "duck", "last": "duck", "first": "daffy"}
out = self._run("-f", "csv", "--header", raw=True)
io_ = io.StringIO(out)
reader = csv.DictReader(io_)
@@ -146,7 +141,7 @@ class TestCat(unittest.TestCase):
self.assertEqual(out.strip(), _JSON_PRETTY.strip())
def test_version(self):
- check_output([sys.executable, SCRIPT, "cat", "--version"])
+ subprocess.check_output([sys.executable, SCRIPT, "cat", "--version"])
def test_files(self):
out = self._run(self.avro_file)
@@ -159,37 +154,37 @@ class TestCat(unittest.TestCase):
# Field selection (with comma and space)
out = self._run('--fields', 'first, last')
- assert json.loads(out[0]) == {'first': unicode('daffy'), 'last':
unicode('duck')}
+ assert json.loads(out[0]) == {'first': 'daffy', 'last': 'duck'}
# Empty fields should get all
out = self._run('--fields', '')
assert json.loads(out[0]) == \
- {'first': unicode('daffy'), 'last': unicode('duck'),
- 'type': unicode('duck')}
+ {'first': 'daffy', 'last': 'duck',
+ 'type': 'duck'}
# Non existing fields are ignored
out = self._run('--fields', 'first,last,age')
- assert json.loads(out[0]) == {'first': unicode('daffy'), 'last':
unicode('duck')}
+ assert json.loads(out[0]) == {'first': 'daffy', 'last': 'duck'}
class TestWrite(unittest.TestCase):
def setUp(self):
- self.json_file = tempfile() + ".json"
+ self.json_file = _tempfile() + ".json"
fo = open(self.json_file, "w")
for record in looney_records():
json.dump(record, fo)
fo.write("\n")
fo.close()
- self.csv_file = tempfile() + ".csv"
+ self.csv_file = _tempfile() + ".csv"
fo = open(self.csv_file, "w")
write = csv.writer(fo).writerow
- get = itemgetter("first", "last", "type")
+ get = operator.itemgetter("first", "last", "type")
for record in looney_records():
write(get(record))
fo.close()
- self.schema_file = tempfile()
+ self.schema_file = _tempfile()
fo = open(self.schema_file, "w")
fo.write(SCHEMA)
fo.close()
@@ -197,31 +192,31 @@ class TestWrite(unittest.TestCase):
def tearDown(self):
for filename in (self.csv_file, self.json_file, self.schema_file):
try:
- remove(filename)
+ os.unlink(filename)
except OSError:
continue
def _run(self, *args, **kw):
args = [sys.executable, SCRIPT, "write", "--schema", self.schema_file]
+ list(args)
- check_call(args, **kw)
+ subprocess.check_call(args, **kw)
def load_avro(self, filename):
- out = check_output([sys.executable, SCRIPT, "cat", filename]).decode()
+ out = subprocess.check_output([sys.executable, SCRIPT, "cat",
filename]).decode()
return [json.loads(o) for o in out.splitlines()]
def test_version(self):
- check_call([sys.executable, SCRIPT, "write", "--version"])
+ subprocess.check_call([sys.executable, SCRIPT, "write", "--version"])
def format_check(self, format, filename):
- tmp = tempfile()
+ tmp = _tempfile()
with open(tmp, "wb") as fo:
self._run(filename, "-f", format, stdout=fo)
records = self.load_avro(tmp)
assert len(records) == NUM_RECORDS
- assert records[0]["first"] == unicode("daffy")
+ assert records[0]["first"] == "daffy"
- remove(tmp)
+ os.unlink(tmp)
def test_write_json(self):
self.format_check("json", self.json_file)
@@ -230,24 +225,24 @@ class TestWrite(unittest.TestCase):
self.format_check("csv", self.csv_file)
def test_outfile(self):
- tmp = tempfile()
- remove(tmp)
+ tmp = _tempfile()
+ os.unlink(tmp)
self._run(self.json_file, "-o", tmp)
assert len(self.load_avro(tmp)) == NUM_RECORDS
- remove(tmp)
+ os.unlink(tmp)
def test_multi_file(self):
- tmp = tempfile()
+ tmp = _tempfile()
with open(tmp, 'wb') as o:
self._run(self.json_file, self.json_file, stdout=o)
assert len(self.load_avro(tmp)) == 2 * NUM_RECORDS
- remove(tmp)
+ os.unlink(tmp)
def test_stdin(self):
- tmp = tempfile()
+ tmp = _tempfile()
info = open(self.json_file, "rb")
self._run("--input-type", "json", "-o", tmp, stdin=info)
assert len(self.load_avro(tmp)) == NUM_RECORDS
- remove(tmp)
+ os.unlink(tmp)
diff --git a/lang/py/avro/test/test_tether_task.py
b/lang/py/avro/test/test_tether_task.py
index 47ff6bb..40aec03 100644
--- a/lang/py/avro/test/test_tether_task.py
+++ b/lang/py/avro/test/test_tether_task.py
@@ -19,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import io
import os
import subprocess
@@ -33,12 +31,6 @@ import avro.test.mock_tether_parent
import avro.test.word_count_task
import avro.tether.tether_task
import avro.tether.util
-from avro import schema, tether
-
-try:
- unicode
-except NameError:
- unicode = str
class TestTetherTask(unittest.TestCase):
@@ -79,7 +71,7 @@ class TestTetherTask(unittest.TestCase):
)
# Serialize some data so we can send it to the input function
- datum = unicode("This is a line of text")
+ datum = "This is a line of text"
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(task.inschema)
@@ -99,7 +91,7 @@ class TestTetherTask(unittest.TestCase):
)
# Serialize some data so we can send it to the input function
- datum = {"key": unicode("word"), "value": 2}
+ datum = {"key": "word", "value": 2}
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(task.midschema)
@@ -114,7 +106,7 @@ class TestTetherTask(unittest.TestCase):
task.complete()
# try a status
- task.status(unicode("Status message"))
+ task.status("Status message")
finally:
# close the process
if not(proc is None):
diff --git a/lang/py/avro/test/test_tether_task_runner.py
b/lang/py/avro/test/test_tether_task_runner.py
index 5e7a0d8..4d31349 100644
--- a/lang/py/avro/test/test_tether_task_runner.py
+++ b/lang/py/avro/test/test_tether_task_runner.py
@@ -19,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import io
import logging
import os
@@ -36,11 +34,6 @@ import avro.tether.tether_task
import avro.tether.tether_task_runner
import avro.tether.util
-try:
- unicode
-except NameError:
- unicode = str
-
class TestTetherTaskRunner(unittest.TestCase):
"""unit test for a tethered task runner."""
@@ -85,12 +78,12 @@ class TestTetherTaskRunner(unittest.TestCase):
# Test the mapper
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.MAP,
- "inSchema": unicode(str(runner.task.inschema)),
- "outSchema": unicode(str(runner.task.midschema))
+ "inSchema": str(runner.task.inschema),
+ "outSchema": str(runner.task.midschema)
})
# Serialize some data so we can send it to the input function
- datum = unicode("This is a line of text")
+ datum = "This is a line of text"
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.inschema)
@@ -105,12 +98,12 @@ class TestTetherTaskRunner(unittest.TestCase):
# Test the reducer
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.REDUCE,
- "inSchema": unicode(str(runner.task.midschema)),
- "outSchema": unicode(str(runner.task.outschema))}
+ "inSchema": str(runner.task.midschema),
+ "outSchema": str(runner.task.outschema)}
)
# Serialize some data so we can send it to the input function
- datum = {"key": unicode("word"), "value": 2}
+ datum = {"key": "word", "value": 2}
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.midschema)
diff --git a/lang/py/avro/test/test_tether_word_count.py
b/lang/py/avro/test/test_tether_word_count.py
index becee00..eaf0ff5 100644
--- a/lang/py/avro/test/test_tether_word_count.py
+++ b/lang/py/avro/test/test_tether_word_count.py
@@ -19,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import collections
import distutils.spawn
import os
@@ -37,11 +35,6 @@ import avro.io
import avro.schema
import avro.tether.tether_task_runner
-try:
- unicode
-except NameError:
- unicode = str
-
_AVRO_DIR = os.path.abspath(os.path.dirname(avro.__file__))
@@ -55,9 +48,9 @@ _AVRO_VERSION = _version()
_JAR_PATH = os.path.join(os.path.dirname(os.path.dirname(_AVRO_DIR)),
"java", "tools", "target",
"avro-tools-{}.jar".format(_AVRO_VERSION))
-_LINES = (unicode("the quick brown fox jumps over the lazy dog"),
- unicode("the cow jumps over the moon"),
- unicode("the rain in spain falls mainly on the plains"))
+_LINES = ("the quick brown fox jumps over the lazy dog",
+ "the cow jumps over the moon",
+ "the rain in spain falls mainly on the plains")
_IN_SCHEMA = '"string"'
# The schema for the output of the mapper and reducer
diff --git a/lang/py/avro/test/word_count_task.py
b/lang/py/avro/test/word_count_task.py
index 19c9b3d..3e3d79e 100644
--- a/lang/py/avro/test/word_count_task.py
+++ b/lang/py/avro/test/word_count_task.py
@@ -19,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import logging
import avro.tether.tether_task
diff --git a/lang/py/avro/tether/__init__.py b/lang/py/avro/tether/__init__.py
index 6c61872..603f1bf 100644
--- a/lang/py/avro/tether/__init__.py
+++ b/lang/py/avro/tether/__init__.py
@@ -20,8 +20,6 @@
# specific language governing permissions and limitations
# under the License.
-from __future__ import absolute_import, division, print_function
-
from avro.tether.tether_task import HTTPRequestor, TaskType, TetherTask,
inputProtocol, outputProtocol
from avro.tether.tether_task_runner import TaskRunner
from avro.tether.util import find_port
diff --git a/lang/py/avro/tether/tether_task.py
b/lang/py/avro/tether/tether_task.py
index d4902d0..dafb42c 100644
--- a/lang/py/avro/tether/tether_task.py
+++ b/lang/py/avro/tether/tether_task.py
@@ -19,8 +19,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
+import abc
import collections
import io
import logging
@@ -31,7 +30,9 @@ import traceback
import avro.errors
import avro.io
-from avro import ipc, protocol, schema
+import avro.ipc
+import avro.protocol
+import avro.schema
__all__ = ["TetherTask", "TaskType", "inputProtocol", "outputProtocol",
"HTTPRequestor"]
@@ -44,7 +45,7 @@ pfile = os.path.split(__file__)[0] + os.sep +
"InputProtocol.avpr"
with open(pfile, 'r') as hf:
prototxt = hf.read()
-inputProtocol = protocol.parse(prototxt)
+inputProtocol = avro.protocol.parse(prototxt)
# use a named tuple to represent the tasktype enumeration
taskschema = inputProtocol.types_dict["TaskType"]
@@ -58,10 +59,10 @@ pfile = os.path.split(__file__)[0] + os.sep +
"OutputProtocol.avpr"
with open(pfile, 'r') as hf:
prototxt = hf.read()
-outputProtocol = protocol.parse(prototxt)
+outputProtocol = avro.protocol.parse(prototxt)
-class Collector(object):
+class Collector:
"""
Collector for map and reduce output values
"""
@@ -75,8 +76,8 @@ class Collector(object):
outputClient - The output client used to send messages to the parent
"""
- if not isinstance(scheme, schema.Schema):
- scheme = schema.parse(scheme)
+ if not isinstance(scheme, avro.schema.Schema):
+ scheme = avro.schema.parse(scheme)
self.scheme = scheme
@@ -122,7 +123,7 @@ def keys_are_equal(rec1, rec2, fkeys):
return True
-class HTTPRequestor(object):
+class HTTPRequestor:
"""
This is a small requestor subclass I created for the HTTP protocol.
Since the HTTP protocol isn't persistent, we need to instantiate
@@ -147,12 +148,12 @@ class HTTPRequestor(object):
self.protocol = protocol
def request(self, *args, **param):
- transciever = ipc.HTTPTransceiver(self.server, self.port)
- requestor = ipc.Requestor(self.protocol, transciever)
+ transciever = avro.ipc.HTTPTransceiver(self.server, self.port)
+ requestor = avro.ipc.Requestor(self.protocol, transciever)
return requestor.request(*args, **param)
-class TetherTask(object):
+class TetherTask(abc.ABC):
"""
Base class for python tether mapreduce programs.
@@ -193,9 +194,9 @@ class TetherTask(object):
"""
# make sure we can parse the schemas
# Should we call fail if we can't parse the schemas?
- self.inschema = schema.parse(inschema)
- self.midschema = schema.parse(midschema)
- self.outschema = schema.parse(outschema)
+ self.inschema = avro.schema.parse(inschema)
+ self.midschema = avro.schema.parse(midschema)
+ self.outschema = avro.schema.parse(outschema)
# declare various variables
self.clienTransciever = None
@@ -250,7 +251,7 @@ class TetherTask(object):
self.log.info("TetherTask.open: Opening connection to parent server on
port={0}".format(clientPort))
- # self.outputClient = ipc.Requestor(outputProtocol,
self.clientTransceiver)
+ # self.outputClient = avro.ipc.Requestor(outputProtocol,
self.clientTransceiver)
# since HTTP is stateless, a new transciever
# is created and closed for each request. We therefore set
clientTransciever to None
# We still declare clientTransciever because for other (state)
protocols we will need
@@ -284,8 +285,8 @@ class TetherTask(object):
self.taskType = taskType
try:
- inSchema = schema.parse(inSchemaText)
- outSchema = schema.parse(outSchemaText)
+ inSchema = avro.schema.parse(inSchemaText)
+ outSchema = avro.schema.parse(outSchemaText)
if (taskType == TaskType.MAP):
self.inReader = avro.io.DatumReader(writers_schema=inSchema,
readers_schema=self.inschema)
@@ -366,6 +367,7 @@ class TetherTask(object):
self.outputClient.request("complete", dict())
+ @abc.abstractmethod
def map(self, record, collector):
"""Called with input values to generate intermediat values (i.e mapper
output).
@@ -378,8 +380,7 @@ class TetherTask(object):
subclass.
"""
- raise NotImplementedError("This is an abstract method which should be
overloaded in the subclass")
-
+ @abc.abstractmethod
def reduce(self, record, collector):
""" Called with input values to generate reducer output. Inputs are
sorted by the mapper
key.
@@ -396,8 +397,7 @@ class TetherTask(object):
subclass.
"""
- raise NotImplementedError("This is an abstract method which should be
overloaded in the subclass")
-
+ @abc.abstractmethod
def reduceFlush(self, record, collector):
"""
Called with the last intermediate value in each equivalence run.
@@ -408,7 +408,6 @@ class TetherTask(object):
------------------------------------------------------------------
record - the last record on which reduce was invoked.
"""
- raise NotImplementedError("This is an abstract method which should be
overloaded in the subclass")
def status(self, message):
"""
diff --git a/lang/py/avro/tether/tether_task_runner.py
b/lang/py/avro/tether/tether_task_runner.py
index f5f5767..98f694e 100644
--- a/lang/py/avro/tether/tether_task_runner.py
+++ b/lang/py/avro/tether/tether_task_runner.py
@@ -19,8 +19,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
+import http.server
import logging
import sys
import threading
@@ -28,19 +27,14 @@ import traceback
import weakref
import avro.errors
+import avro.ipc
import avro.tether.tether_task
import avro.tether.util
-from avro import ipc
-
-try:
- import BaseHTTPServer as http_server # type: ignore
-except ImportError:
- from http import server as http_server # type: ignore
__all__ = ["TaskRunner"]
-class TaskRunnerResponder(ipc.Responder):
+class TaskRunnerResponder(avro.ipc.Responder):
"""
The responder for the tethered process
"""
@@ -51,7 +45,7 @@ class TaskRunnerResponder(ipc.Responder):
----------------------------------------------------------
runner - Instance of TaskRunner
"""
- ipc.Responder.__init__(self, avro.tether.tether_task.inputProtocol)
+ avro.ipc.Responder.__init__(self,
avro.tether.tether_task.inputProtocol)
self.log = logging.getLogger("TaskRunnerResponder")
@@ -113,7 +107,7 @@ def HTTPHandlerGen(runner):
else:
runnerref = runner
- class TaskRunnerHTTPHandler(http_server.BaseHTTPRequestHandler):
+ class TaskRunnerHTTPHandler(http.server.BaseHTTPRequestHandler):
"""Create a handler for the parent.
"""
@@ -122,23 +116,23 @@ def HTTPHandlerGen(runner):
def __init__(self, *args, **param):
"""
"""
- http_server.BaseHTTPRequestHandler.__init__(self, *args, **param)
+ http.server.BaseHTTPRequestHandler.__init__(self, *args, **param)
def do_POST(self):
self.responder = TaskRunnerResponder(self.runner)
- call_request_reader = ipc.FramedReader(self.rfile)
+ call_request_reader = avro.ipc.FramedReader(self.rfile)
call_request = call_request_reader.read_framed_message()
resp_body = self.responder.respond(call_request)
self.send_response(200)
self.send_header('Content-Type', 'avro/binary')
self.end_headers()
- resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer = avro.ipc.FramedWriter(self.wfile)
resp_writer.write_framed_message(resp_body)
return TaskRunnerHTTPHandler
-class TaskRunner(object):
+class TaskRunner:
"""This class ties together the server handling the requests from
the parent process and the instance of TetherTask which actually
implements the logic for the mapper and reducer phases
@@ -180,7 +174,7 @@ class TaskRunner(object):
address = ("localhost", port)
def thread_run(task_runner=None):
- task_runner.server = http_server.HTTPServer(address,
HTTPHandlerGen(task_runner))
+ task_runner.server = http.server.HTTPServer(address,
HTTPHandlerGen(task_runner))
task_runner.server.allow_reuse_address = True
task_runner.server.serve_forever()
diff --git a/lang/py/avro/tether/util.py b/lang/py/avro/tether/util.py
index d8c3058..5bc3410 100644
--- a/lang/py/avro/tether/util.py
+++ b/lang/py/avro/tether/util.py
@@ -19,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import socket
diff --git a/lang/py/avro/timezones.py b/lang/py/avro/timezones.py
index 934965a..aeef181 100644
--- a/lang/py/avro/timezones.py
+++ b/lang/py/avro/timezones.py
@@ -19,35 +19,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
+import datetime
-from datetime import datetime, timedelta, tzinfo
-
-class UTCTzinfo(tzinfo):
+class UTCTzinfo(datetime.tzinfo):
def utcoffset(self, dt):
- return timedelta(0)
+ return datetime.timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
- return timedelta(0)
+ return datetime.timedelta(0)
utc = UTCTzinfo()
# Test Time Zone with fixed offset and no DST
-class TSTTzinfo(tzinfo):
+class TSTTzinfo(datetime.tzinfo):
def utcoffset(self, dt):
- return timedelta(hours=10)
+ return datetime.timedelta(hours=10)
def tzname(self, dt):
return "TST"
def dst(self, dt):
- return timedelta(0)
+ return datetime.timedelta(0)
tst = TSTTzinfo()
diff --git a/lang/py/avro/tool.py b/lang/py/avro/tool.py
index 2bcf5c3..454c713 100644
--- a/lang/py/avro/tool.py
+++ b/lang/py/avro/tool.py
@@ -25,31 +25,23 @@ Command-line tool
NOTE: The API for the command-line tool is experimental.
"""
-from __future__ import absolute_import, division, print_function
-
+import http.server
import os.path
import sys
import threading
+import urllib.parse
import warnings
+import avro.datafile
import avro.io
-from avro import datafile, ipc, protocol
-
-try:
- import BaseHTTPServer as http_server # type: ignore
-except ImportError:
- import http.server as http_server # type: ignore
-
-try:
- from urllib.parse import urlparse
-except ImportError:
- from urlparse import urlparse # type: ignore
+import avro.ipc
+import avro.protocol
-class GenericResponder(ipc.Responder):
+class GenericResponder(avro.ipc.Responder):
def __init__(self, proto, msg, datum):
proto_json = open(proto, 'rb').read()
- ipc.Responder.__init__(self, protocol.parse(proto_json))
+ avro.ipc.Responder.__init__(self, avro.protocol.parse(proto_json))
self.msg = msg
self.datum = datum
@@ -62,16 +54,16 @@ class GenericResponder(ipc.Responder):
return self.datum
-class GenericHandler(http_server.BaseHTTPRequestHandler):
+class GenericHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
self.responder = responder
- call_request_reader = ipc.FramedReader(self.rfile)
+ call_request_reader = avro.ipc.FramedReader(self.rfile)
call_request = call_request_reader.read_framed_message()
resp_body = self.responder.respond(call_request)
self.send_response(200)
self.send_header('Content-Type', 'avro/binary')
self.end_headers()
- resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer = avro.ipc.FramedWriter(self.wfile)
resp_writer.write_framed_message(resp_body)
if server_should_shutdown:
print("Shutting down server.", file=sys.stderr)
@@ -81,13 +73,13 @@ class GenericHandler(http_server.BaseHTTPRequestHandler):
def run_server(uri, proto, msg, datum):
- url_obj = urlparse(uri)
+ url_obj = urllib.parse(uri)
server_addr = (url_obj.hostname, url_obj.port)
global responder
global server_should_shutdown
server_should_shutdown = False
responder = GenericResponder(proto, msg, datum)
- server = http_server.HTTPServer(server_addr, GenericHandler)
+ server = http.server.HTTPServer(server_addr, GenericHandler)
print("Port: %s" % server.server_port)
sys.stdout.flush()
server.allow_reuse_address = True
@@ -96,10 +88,10 @@ def run_server(uri, proto, msg, datum):
def send_message(uri, proto, msg, datum):
- url_obj = urlparse(uri)
- client = ipc.HTTPTransceiver(url_obj.hostname, url_obj.port)
+ url_obj = urllib.parse(uri)
+ client = avro.ipc.HTTPTransceiver(url_obj.hostname, url_obj.port)
proto_json = open(proto, 'rb').read()
- requestor = ipc.Requestor(protocol.parse(proto_json), client)
+ requestor = avro.ipc.Requestor(avro.protocol.parse(proto_json), client)
print(requestor.request(msg, datum))
##
@@ -119,7 +111,7 @@ def main(args=sys.argv):
if len(args) != 3:
print("Usage: %s dump input_file" % args[0])
return 1
- for d in datafile.DataFileReader(file_or_stdin(args[2]),
avro.io.DatumReader()):
+ for d in avro.datafile.DataFileReader(file_or_stdin(args[2]),
avro.io.DatumReader()):
print(repr(d))
elif args[1] == "rpcreceive":
usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
@@ -133,7 +125,7 @@ def main(args=sys.argv):
if args[5] == "-file":
reader = open(args[6], 'rb')
datum_reader = avro.io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
+ dfr = avro.datafile.DataFileReader(reader, datum_reader)
datum = next(dfr)
elif args[5] == "-data":
print("JSON Decoder not yet implemented.")
@@ -154,7 +146,7 @@ def main(args=sys.argv):
if args[5] == "-file":
reader = open(args[6], 'rb')
datum_reader = avro.io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
+ dfr = avro.datafile.DataFileReader(reader, datum_reader)
datum = next(dfr)
elif args[5] == "-data":
print("JSON Decoder not yet implemented.")
diff --git a/lang/py/scripts/avro b/lang/py/scripts/avro
index 29a2ffe..d0a2435 100755
--- a/lang/py/scripts/avro
+++ b/lang/py/scripts/avro
@@ -18,37 +18,21 @@
"""Command line utility for reading and writing Avro files."""
-from __future__ import absolute_import, division, print_function
-
import csv
-import itertools
+import functools
import json
+import optparse
import os.path
import sys
-from functools import partial
-from optparse import OptionGroup, OptionParser
import avro
+import avro.datafile
import avro.errors
+import avro.io
import avro.schema
-from avro.datafile import DataFileReader, DataFileWriter
-from avro.io import DatumReader, DatumWriter
_AVRO_DIR = os.path.abspath(os.path.dirname(avro.__file__))
-ifilter = getattr(itertools, 'ifilter', filter)
-imap = getattr(itertools, 'imap', map)
-
-try:
- unicode
-except NameError:
- unicode = str
-
-try:
- long
-except NameError:
- long = int
-
def _version():
with open(os.path.join(_AVRO_DIR, 'VERSION.txt')) as v:
@@ -113,7 +97,8 @@ def print_avro(avro, opts):
# Apply filter first
if opts.filter:
- avro = ifilter(partial(record_match, opts.filter), avro)
+ predicate = functools.partial(record_match, opts.filter)
+ avro = (r for r in avro if predicate(r))
for i in range(opts.skip):
try:
@@ -123,7 +108,8 @@ def print_avro(avro, opts):
fields = parse_fields(opts.fields)
if fields:
- avro = imap(field_selector(fields), avro)
+ fs = field_selector(fields)
+ avro = (fs(r) for r in avro)
printer = select_printer(opts.format)
for i, record in enumerate(avro):
@@ -144,11 +130,11 @@ def cat(opts, args):
if not args:
raise avro.errors.UsageError("No files to show")
for filename in args:
- with DataFileReader(open(filename, 'rb'), DatumReader()) as avro:
+ with avro.datafile.DataFileReader(open(filename, 'rb'),
avro.io.DatumReader()) as avro_:
if opts.print_schema:
- print_schema(avro)
+ print_schema(avro_)
continue
- print_avro(avro, opts)
+ print_avro(avro_, opts)
def _open(filename, mode):
@@ -177,10 +163,10 @@ def convert(value, field):
return {
"int": int,
- "long": long,
+ "long": int,
"float": float,
"double": float,
- "string": unicode,
+ "string": str,
"bytes": bytes,
"boolean": bool,
"null": lambda _: None,
@@ -232,7 +218,7 @@ def write(opts, files):
except (IOError, OSError) as e:
raise avro.errors.UsageError("Can't open file - %s" % e)
- writer = DataFileWriter(getattr(out, 'buffer', out), DatumWriter(), schema)
+ writer = avro.datafile.DataFileWriter(getattr(out, 'buffer', out),
avro.io.DatumWriter(), schema)
for filename in (files or ["-"]):
info = _open(filename, "rb")
@@ -243,11 +229,11 @@ def write(opts, files):
def main(argv):
- parser = OptionParser(description="Display/write for Avro files",
- version=_AVRO_VERSION,
- usage="usage: %prog cat|write [options] FILE
[FILE...]")
+ parser = optparse.OptionParser(description="Display/write for Avro files",
+ version=_AVRO_VERSION,
+ usage="usage: %prog cat|write [options]
FILE [FILE...]")
# cat options
- cat_options = OptionGroup(parser, "cat options")
+ cat_options = optparse.OptionGroup(parser, "cat options")
cat_options.add_option("-n", "--count", default=float("Infinity"),
help="number of records to print", type=int)
cat_options.add_option("-s", "--skip", help="number of records to skip",
@@ -266,7 +252,7 @@ def main(argv):
parser.add_option_group(cat_options)
# write options
- write_options = OptionGroup(parser, "write options")
+ write_options = optparse.OptionGroup(parser, "write options")
write_options.add_option("--schema", help="schema file (required)")
write_options.add_option("--input-type",
help="input file(s) type (json or csv)",
diff --git a/lang/py/setup.py b/lang/py/setup.py
index 42d8320..91dd948 100755
--- a/lang/py/setup.py
+++ b/lang/py/setup.py
@@ -20,8 +20,6 @@
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
import setuptools # type: ignore
import distutils.errors
import glob