This is an automated email from the ASF dual-hosted git repository.
kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new ac82a6e AVRO-2556: Add bzip2 codec to the Python2 bindings (#645)
ac82a6e is described below
commit ac82a6ef35b7ca76c557d46471becc818846c35a
Author: Kengo Seki <[email protected]>
AuthorDate: Wed Jan 15 22:22:42 2020 +0900
AVRO-2556: Add bzip2 codec to the Python2 bindings (#645)
* AVRO-2556: Add bzip2 codec to the Python2 bindings
* AVRO-2556: Address bz2 import error
* AVRO-2556: Define abstract Codec class as a superclass of all codecs
* Remove redundant code, add docstrings, introduce abc, etc.
---
lang/py/avro/__init__.py | 2 +-
lang/py/avro/codecs.py | 204 ++++++++++++++++++++++++++++++++++
lang/py/avro/datafile.py | 83 ++------------
lang/py/avro/io.py | 37 +-----
lang/py/avro/ipc.py | 3 +-
lang/py/avro/test/gen_interop_data.py | 17 +--
lang/py/avro/test/test_datafile.py | 13 +--
7 files changed, 228 insertions(+), 131 deletions(-)
diff --git a/lang/py/avro/__init__.py b/lang/py/avro/__init__.py
index 9a859e9..096f645 100644
--- a/lang/py/avro/__init__.py
+++ b/lang/py/avro/__init__.py
@@ -19,4 +19,4 @@
from __future__ import absolute_import, division, print_function
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants',
'timezones']
+__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants',
'timezones', 'codecs']
diff --git a/lang/py/avro/codecs.py b/lang/py/avro/codecs.py
new file mode 100644
index 0000000..772db3e
--- /dev/null
+++ b/lang/py/avro/codecs.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Contains Codecs for Python Avro.
+
+Note that the word "codecs" means "compression/decompression algorithms" in the
+Avro world
(https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
+so don't confuse it with the Python's "codecs", which is a package mainly for
+converting charsets (https://docs.python.org/3/library/codecs.html).
+"""
+
+from __future__ import absolute_import, division, print_function
+
+import io
+import struct
+import sys
+import zlib
+from abc import ABCMeta, abstractmethod
+from binascii import crc32
+from struct import Struct
+
+import avro.io
+from avro.schema import AvroException
+
+#
+# Constants
+#
+STRUCT_CRC32 = Struct('>I') # big-endian unsigned int
+
+
+try:
+ import bz2
+ has_bzip2 = True
+except ImportError:
+ has_bzip2 = False
+try:
+ import snappy
+ has_snappy = True
+except ImportError:
+ has_snappy = False
+try:
+ import zstandard as zstd
+ has_zstandard = True
+except ImportError:
+ has_zstandard = False
+
+
+class Codec:
+ """Abstract base class for all Avro codec classes."""
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def compress(self, data):
+ """Compress the passed data.
+
+ :param data: a byte string to be compressed
+ :type data: str
+
+ :rtype: tuple
+ :return: compressed data and its length
+ """
+ pass
+
+ @abstractmethod
+ def decompress(self, readers_decoder):
+ """Read compressed data via the passed BinaryDecoder and decompress it.
+
+ :param readers_decoder: a BinaryDecoder object currently being used for
+ reading an object container file
+ :type readers_decoder: avro.io.BinaryDecoder
+
+ :rtype: avro.io.BinaryDecoder
+ :return: a newly instantiated BinaryDecoder object that contains the
+ decompressed data which is wrapped by a StringIO
+ """
+ pass
+
+
+class NullCodec(Codec):
+ def compress(self, data):
+ return data, len(data)
+
+ def decompress(self, readers_decoder):
+ readers_decoder.skip_long()
+ return readers_decoder
+
+
+class DeflateCodec(Codec):
+ def compress(self, data):
+ # The first two characters and last character are zlib
+ # wrappers around deflate data.
+ compressed_data = zlib.compress(data)[2:-1]
+ return compressed_data, len(compressed_data)
+
+ def decompress(self, readers_decoder):
+ # Compressed data is stored as (length, data), which
+ # corresponds to how the "bytes" type is encoded.
+ data = readers_decoder.read_bytes()
+ # -15 is the log of the window size; negative indicates
+ # "raw" (no zlib headers) decompression. See zlib.h.
+ uncompressed = zlib.decompress(data, -15)
+ return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+
+if has_bzip2:
+ class BZip2Codec(Codec):
+ def compress(self, data):
+ compressed_data = bz2.compress(data)
+ return compressed_data, len(compressed_data)
+
+ def decompress(self, readers_decoder):
+ length = readers_decoder.read_long()
+ data = readers_decoder.read(length)
+ uncompressed = bz2.decompress(data)
+ return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+
+if has_snappy:
+ class SnappyCodec(Codec):
+ def compress(self, data):
+ compressed_data = snappy.compress(data)
+ # A 4-byte, big-endian CRC32 checksum
+ compressed_data += STRUCT_CRC32.pack(crc32(data) & 0xffffffff)
+ return compressed_data, len(compressed_data)
+
+ def decompress(self, readers_decoder):
+ # Compressed data includes a 4-byte CRC32 checksum
+ length = readers_decoder.read_long()
+ data = readers_decoder.read(length - 4)
+ uncompressed = snappy.decompress(data)
+ checksum = readers_decoder.read(4)
+ self.check_crc32(uncompressed, checksum)
+ return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+ def check_crc32(self, bytes, checksum):
+ checksum = STRUCT_CRC32.unpack(checksum)[0];
+ if crc32(bytes) & 0xffffffff != checksum:
+ raise schema.AvroException("Checksum failure")
+
+
+if has_zstandard:
+ class ZstandardCodec(Codec):
+ def compress(self, data):
+ compressed_data = zstd.ZstdCompressor().compress(data)
+ return compressed_data, len(compressed_data)
+
+ def decompress(self, readers_decoder):
+ length = readers_decoder.read_long()
+ data = readers_decoder.read(length)
+ uncompressed = bytearray()
+ dctx = zstd.ZstdDecompressor()
+ with dctx.stream_reader(io.BytesIO(data)) as reader:
+ while True:
+ chunk = reader.read(16384)
+ if not chunk:
+ break
+ uncompressed.extend(chunk)
+ return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+
+class Codecs(object):
+ @staticmethod
+ def get_codec(codec_name):
+ codec_name = codec_name.lower()
+ if codec_name == "null":
+ return NullCodec()
+ elif codec_name == "deflate":
+ return DeflateCodec()
+ elif codec_name == "bzip2" and has_bzip2:
+ return BZip2Codec()
+ elif codec_name == "snappy" and has_snappy:
+ return SnappyCodec()
+ elif codec_name == "zstandard" and has_zstandard:
+ return ZstandardCodec()
+ else:
+ raise ValueError("Unsupported codec: %r" % codec_name)
+
+ @staticmethod
+ def supported_codec_names():
+ codec_names = ['null', 'deflate']
+ if has_bzip2:
+ codec_names.append('bzip2')
+ if has_snappy:
+ codec_names.append('snappy')
+ if has_zstandard:
+ codec_names.append('zstandard')
+ return codec_names
diff --git a/lang/py/avro/datafile.py b/lang/py/avro/datafile.py
index 5c6eae6..6d9222b 100644
--- a/lang/py/avro/datafile.py
+++ b/lang/py/avro/datafile.py
@@ -28,18 +28,7 @@ import zlib
import avro.io
import avro.schema
-
-try:
- import snappy
- has_snappy = True
-except ImportError:
- has_snappy = False
-try:
- import zstandard as zstd
- has_zstandard = True
-except ImportError:
- has_zstandard = False
-
+from avro.codecs import Codecs
#
# Constants
@@ -56,11 +45,9 @@ META_SCHEMA = avro.schema.parse("""\
{"name": "meta", "type": {"type": "map", "values": "bytes"}},
{"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
""" % (MAGIC_SIZE, SYNC_SIZE))
-VALID_CODECS = ['null', 'deflate']
-if has_snappy:
- VALID_CODECS.append('snappy')
-if has_zstandard:
- VALID_CODECS.append('zstandard')
+
+NULL_CODEC = 'null'
+VALID_CODECS = Codecs.supported_codec_names()
VALID_ENCODINGS = ['binary'] # not used yet
CODEC_KEY = "avro.codec"
@@ -142,7 +129,7 @@ class _DataFile(object):
class DataFileWriter(_DataFile):
# TODO(hammer): make 'encoder' a metadata property
- def __init__(self, writer, datum_writer, writers_schema=None, codec='null'):
+ def __init__(self, writer, datum_writer, writers_schema=None,
codec=NULL_CODEC):
"""
If the schema is not present, presume we're appending.
@@ -215,24 +202,8 @@ class DataFileWriter(_DataFile):
# write block contents
uncompressed_data = self.buffer_writer.getvalue()
- codec = self.codec
- if codec == 'null':
- compressed_data = uncompressed_data
- compressed_data_length = len(compressed_data)
- elif codec == 'deflate':
- # The first two characters and last character are zlib
- # wrappers around deflate data.
- compressed_data = zlib.compress(uncompressed_data)[2:-1]
- compressed_data_length = len(compressed_data)
- elif codec == 'snappy':
- compressed_data = snappy.compress(uncompressed_data)
- compressed_data_length = len(compressed_data) + 4 # crc32
- elif codec == 'zstandard':
- compressed_data = zstd.ZstdCompressor().compress(uncompressed_data)
- compressed_data_length = len(compressed_data)
- else:
- fail_msg = '"%s" codec is not supported.' % self.codec
- raise DataFileException(fail_msg)
+ codec = Codecs.get_codec(self.codec)
+ compressed_data, compressed_data_length =
codec.compress(uncompressed_data)
# Write length of block
self.encoder.write_long(compressed_data_length)
@@ -240,10 +211,6 @@ class DataFileWriter(_DataFile):
# Write block
self.writer.write(compressed_data)
- # Write CRC32 checksum for Snappy
- if codec == 'snappy':
- self.encoder.write_crc32(uncompressed_data)
-
# write sync marker
self.writer.write(self.sync_marker)
@@ -345,40 +312,8 @@ class DataFileReader(_DataFile):
def _read_block_header(self):
self.block_count = self.raw_decoder.read_long()
- codec = self.codec
- if codec == "null":
- # Skip a long; we don't need to use the length.
- self.raw_decoder.skip_long()
- self._datum_decoder = self._raw_decoder
- elif codec == 'deflate':
- # Compressed data is stored as (length, data), which
- # corresponds to how the "bytes" type is encoded.
- data = self.raw_decoder.read_bytes()
- # -15 is the log of the window size; negative indicates
- # "raw" (no zlib headers) decompression. See zlib.h.
- uncompressed = zlib.decompress(data, -15)
- self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
- elif codec == 'snappy':
- # Compressed data includes a 4-byte CRC32 checksum
- length = self.raw_decoder.read_long()
- data = self.raw_decoder.read(length - 4)
- uncompressed = snappy.decompress(data)
- self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
- self.raw_decoder.check_crc32(uncompressed);
- elif codec == 'zstandard':
- length = self.raw_decoder.read_long()
- data = self.raw_decoder.read(length)
- uncompressed = bytearray()
- dctx = zstd.ZstdDecompressor()
- with dctx.stream_reader(io.BytesIO(data)) as reader:
- while True:
- chunk = reader.read(16384)
- if not chunk:
- break
- uncompressed.extend(chunk)
- self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
- else:
- raise DataFileException("Unknown codec: %r" % self.codec)
+ codec = Codecs.get_codec(self.codec)
+ self._datum_decoder = codec.decompress(self.raw_decoder)
def _skip_sync(self):
"""
diff --git a/lang/py/avro/io.py b/lang/py/avro/io.py
index 6e9ccdd..52b631a 100644
--- a/lang/py/avro/io.py
+++ b/lang/py/avro/io.py
@@ -47,8 +47,8 @@ import datetime
import json
import struct
import sys
-from binascii import crc32
from decimal import Decimal, getcontext
+from struct import Struct
from avro import constants, schema, timezones
@@ -81,24 +81,11 @@ LONG_MIN_VALUE = -(1 << 63)
LONG_MAX_VALUE = (1 << 63) - 1
# TODO(hammer): shouldn't ! be < for little-endian (according to spec?)
-if sys.version_info >= (2, 5, 0):
- struct_class = struct.Struct
-else:
- class SimpleStruct(object):
- def __init__(self, format):
- self.format = format
- def pack(self, *args):
- return struct.pack(self.format, *args)
- def unpack(self, *args):
- return struct.unpack(self.format, *args)
- struct_class = SimpleStruct
-
-STRUCT_FLOAT = struct_class('<f') # big-endian float
-STRUCT_DOUBLE = struct_class('<d') # big-endian double
-STRUCT_CRC32 = struct_class('>I') # big-endian unsigned int
-STRUCT_SIGNED_SHORT = struct_class('>h') # big-endian signed short
-STRUCT_SIGNED_INT = struct_class('>i') # big-endian signed int
-STRUCT_SIGNED_LONG = struct_class('>q') # big-endian signed long
+STRUCT_FLOAT = Struct('<f') # big-endian float
+STRUCT_DOUBLE = Struct('<d') # big-endian double
+STRUCT_SIGNED_SHORT = Struct('>h') # big-endian signed short
+STRUCT_SIGNED_INT = Struct('>i') # big-endian signed int
+STRUCT_SIGNED_LONG = Struct('>q') # big-endian signed long
#
@@ -373,12 +360,6 @@ class BinaryDecoder(object):
unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0,
tzinfo=timezones.utc)
return unix_epoch_datetime + timedelta
-
- def check_crc32(self, bytes):
- checksum = STRUCT_CRC32.unpack(self.read(4))[0];
- if crc32(bytes) & 0xffffffff != checksum:
- raise schema.AvroException("Checksum failure")
-
def skip_null(self):
pass
@@ -551,12 +532,6 @@ class BinaryEncoder(object):
datum = datum.encode("utf-8")
self.write_bytes(datum)
- def write_crc32(self, bytes):
- """
- A 4-byte, big-endian CRC32 checksum
- """
- self.write(STRUCT_CRC32.pack(crc32(bytes) & 0xffffffff));
-
def write_date_int(self, datum):
"""
Encode python date object as int.
diff --git a/lang/py/avro/ipc.py b/lang/py/avro/ipc.py
index 1c69063..9f2d67d 100644
--- a/lang/py/avro/ipc.py
+++ b/lang/py/avro/ipc.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import, division,
print_function
import io
import os
+from struct import Struct
import avro.io
from avro import protocol, schema
@@ -64,7 +65,7 @@ SYSTEM_ERROR_SCHEMA = schema.parse('["string"]')
REMOTE_HASHES = {}
REMOTE_PROTOCOLS = {}
-BIG_ENDIAN_INT_STRUCT = avro.io.struct_class('!I')
+BIG_ENDIAN_INT_STRUCT = Struct('!I')
BUFFER_HEADER_LENGTH = 4
BUFFER_SIZE = 8192
diff --git a/lang/py/avro/test/gen_interop_data.py
b/lang/py/avro/test/gen_interop_data.py
index 6f2f428..c4bc65b 100644
--- a/lang/py/avro/test/gen_interop_data.py
+++ b/lang/py/avro/test/gen_interop_data.py
@@ -26,24 +26,15 @@ import sys
import avro.datafile
import avro.io
import avro.schema
+from avro.codecs import Codecs
try:
unicode
except NameError:
unicode = str
-CODECS_TO_VALIDATE = ('null', 'deflate')
-
-try:
- import snappy
- CODECS_TO_VALIDATE += ('snappy',)
-except ImportError:
- print('Snappy not present, will skip generating it.')
-try:
- import zstandard
- CODECS_TO_VALIDATE += ('zstandard',)
-except ImportError:
- print('Zstandard not present, will skip generating it.')
+NULL_CODEC = 'null'
+CODECS_TO_VALIDATE = Codecs.supported_codec_names()
DATUM = {
'intField': 12,
@@ -69,7 +60,7 @@ def generate(schema_path, output_path):
interop_schema = avro.schema.parse(schema_file.read())
for codec in CODECS_TO_VALIDATE:
filename = output_path
- if codec != 'null':
+ if codec != NULL_CODEC:
base, ext = os.path.splitext(output_path)
filename = base + "_" + codec + ext
with avro.datafile.DataFileWriter(open(filename, 'wb'),
avro.io.DatumWriter(),
diff --git a/lang/py/avro/test/test_datafile.py
b/lang/py/avro/test/test_datafile.py
index b23b5b1..dad56ef 100644
--- a/lang/py/avro/test/test_datafile.py
+++ b/lang/py/avro/test/test_datafile.py
@@ -23,6 +23,7 @@ import os
import unittest
from avro import datafile, io, schema
+from avro.codecs import Codecs
try:
unicode
@@ -64,17 +65,7 @@ SCHEMAS_TO_VALIDATE = (
)
FILENAME = 'test_datafile.out'
-CODECS_TO_VALIDATE = ('null', 'deflate')
-try:
- import snappy
- CODECS_TO_VALIDATE += ('snappy',)
-except ImportError:
- print('Snappy not present, will skip testing it.')
-try:
- import zstandard
- CODECS_TO_VALIDATE += ('zstandard',)
-except ImportError:
- print('Zstandard not present, will skip testing it.')
+CODECS_TO_VALIDATE = Codecs.supported_codec_names()
class TestDataFile(unittest.TestCase):
def test_round_trip(self):