This is an automated email from the ASF dual-hosted git repository.

kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new ac82a6e  AVRO-2556: Add bzip2 codec to the Python2 bindings (#645)
ac82a6e is described below

commit ac82a6ef35b7ca76c557d46471becc818846c35a
Author: Kengo Seki <[email protected]>
AuthorDate: Wed Jan 15 22:22:42 2020 +0900

    AVRO-2556: Add bzip2 codec to the Python2 bindings (#645)
    
    * AVRO-2556: Add bzip2 codec to the Python2 bindings
    
    * AVRO-2556: Address bz2 import error
    
    * AVRO-2556: Define abstract Codec class as a superclass of all codecs
    
    * Remove redundant code, add docstrings, introduce abc, etc.
---
 lang/py/avro/__init__.py              |   2 +-
 lang/py/avro/codecs.py                | 204 ++++++++++++++++++++++++++++++++++
 lang/py/avro/datafile.py              |  83 ++------------
 lang/py/avro/io.py                    |  37 +-----
 lang/py/avro/ipc.py                   |   3 +-
 lang/py/avro/test/gen_interop_data.py |  17 +--
 lang/py/avro/test/test_datafile.py    |  13 +--
 7 files changed, 228 insertions(+), 131 deletions(-)

diff --git a/lang/py/avro/__init__.py b/lang/py/avro/__init__.py
index 9a859e9..096f645 100644
--- a/lang/py/avro/__init__.py
+++ b/lang/py/avro/__init__.py
@@ -19,4 +19,4 @@
 
 from __future__ import absolute_import, division, print_function
 
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 
'timezones']
+__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 
'timezones', 'codecs']
diff --git a/lang/py/avro/codecs.py b/lang/py/avro/codecs.py
new file mode 100644
index 0000000..772db3e
--- /dev/null
+++ b/lang/py/avro/codecs.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Contains Codecs for Python Avro.
+
+Note that the word "codecs" means "compression/decompression algorithms" in the
+Avro world 
(https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
+so don't confuse it with the Python's "codecs", which is a package mainly for
+converting charsets (https://docs.python.org/3/library/codecs.html).
+"""
+
+from __future__ import absolute_import, division, print_function
+
+import io
+import struct
+import sys
+import zlib
+from abc import ABCMeta, abstractmethod
+from binascii import crc32
+from struct import Struct
+
+import avro.io
+from avro.schema import AvroException
+
+#
+# Constants
+#
+STRUCT_CRC32 = Struct('>I')  # big-endian unsigned int
+
+
+try:
+  import bz2
+  has_bzip2 = True
+except ImportError:
+  has_bzip2 = False
+try:
+  import snappy
+  has_snappy = True
+except ImportError:
+  has_snappy = False
+try:
+  import zstandard as zstd
+  has_zstandard = True
+except ImportError:
+  has_zstandard = False
+
+
+class Codec:
+  """Abstract base class for all Avro codec classes."""
+  __metaclass__ = ABCMeta
+
+  @abstractmethod
+  def compress(self, data):
+    """Compress the passed data.
+
+    :param data: a byte string to be compressed
+    :type data: str
+
+    :rtype: tuple
+    :return: compressed data and its length
+    """
+    pass
+
+  @abstractmethod
+  def decompress(self, readers_decoder):
+    """Read compressed data via the passed BinaryDecoder and decompress it.
+
+    :param readers_decoder: a BinaryDecoder object currently being used for
+                            reading an object container file
+    :type readers_decoder: avro.io.BinaryDecoder
+
+    :rtype: avro.io.BinaryDecoder
+    :return: a newly instantiated BinaryDecoder object that contains the
+             decompressed data which is wrapped by a StringIO
+    """
+    pass
+
+
+class NullCodec(Codec):
+  def compress(self, data):
+    return data, len(data)
+
+  def decompress(self, readers_decoder):
+    readers_decoder.skip_long()
+    return readers_decoder
+
+
+class DeflateCodec(Codec):
+  def compress(self, data):
+    # The first two characters and last character are zlib
+    # wrappers around deflate data.
+    compressed_data = zlib.compress(data)[2:-1]
+    return compressed_data, len(compressed_data)
+
+  def decompress(self, readers_decoder):
+    # Compressed data is stored as (length, data), which
+    # corresponds to how the "bytes" type is encoded.
+    data = readers_decoder.read_bytes()
+    # -15 is the log of the window size; negative indicates
+    # "raw" (no zlib headers) decompression.  See zlib.h.
+    uncompressed = zlib.decompress(data, -15)
+    return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+
+if has_bzip2:
+  class BZip2Codec(Codec):
+    def compress(self, data):
+      compressed_data = bz2.compress(data)
+      return compressed_data, len(compressed_data)
+
+    def decompress(self, readers_decoder):
+      length = readers_decoder.read_long()
+      data = readers_decoder.read(length)
+      uncompressed = bz2.decompress(data)
+      return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+
+if has_snappy:
+  class SnappyCodec(Codec):
+    def compress(self, data):
+      compressed_data = snappy.compress(data)
+      # A 4-byte, big-endian CRC32 checksum
+      compressed_data += STRUCT_CRC32.pack(crc32(data) & 0xffffffff)
+      return compressed_data, len(compressed_data)
+
+    def decompress(self, readers_decoder):
+      # Compressed data includes a 4-byte CRC32 checksum
+      length = readers_decoder.read_long()
+      data = readers_decoder.read(length - 4)
+      uncompressed = snappy.decompress(data)
+      checksum = readers_decoder.read(4)
+      self.check_crc32(uncompressed, checksum)
+      return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+    def check_crc32(self, bytes, checksum):
+      checksum = STRUCT_CRC32.unpack(checksum)[0];
+      if crc32(bytes) & 0xffffffff != checksum:
+        raise schema.AvroException("Checksum failure")
+
+
+if has_zstandard:
+  class ZstandardCodec(Codec):
+    def compress(self, data):
+      compressed_data = zstd.ZstdCompressor().compress(data)
+      return compressed_data, len(compressed_data)
+
+    def decompress(self, readers_decoder):
+      length = readers_decoder.read_long()
+      data = readers_decoder.read(length)
+      uncompressed = bytearray()
+      dctx = zstd.ZstdDecompressor()
+      with dctx.stream_reader(io.BytesIO(data)) as reader:
+        while True:
+          chunk = reader.read(16384)
+          if not chunk:
+            break
+          uncompressed.extend(chunk)
+      return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
+
+
+class Codecs(object):
+  @staticmethod
+  def get_codec(codec_name):
+    codec_name = codec_name.lower()
+    if codec_name == "null":
+      return NullCodec()
+    elif codec_name == "deflate":
+      return DeflateCodec()
+    elif codec_name == "bzip2" and has_bzip2:
+      return BZip2Codec()
+    elif codec_name == "snappy" and has_snappy:
+      return SnappyCodec()
+    elif codec_name == "zstandard" and has_zstandard:
+      return ZstandardCodec()
+    else:
+      raise ValueError("Unsupported codec: %r" % codec_name)
+
+  @staticmethod
+  def supported_codec_names():
+    codec_names = ['null', 'deflate']
+    if has_bzip2:
+      codec_names.append('bzip2')
+    if has_snappy:
+      codec_names.append('snappy')
+    if has_zstandard:
+      codec_names.append('zstandard')
+    return codec_names
diff --git a/lang/py/avro/datafile.py b/lang/py/avro/datafile.py
index 5c6eae6..6d9222b 100644
--- a/lang/py/avro/datafile.py
+++ b/lang/py/avro/datafile.py
@@ -28,18 +28,7 @@ import zlib
 
 import avro.io
 import avro.schema
-
-try:
-  import snappy
-  has_snappy = True
-except ImportError:
-  has_snappy = False
-try:
-  import zstandard as zstd
-  has_zstandard = True
-except ImportError:
-  has_zstandard = False
-
+from avro.codecs import Codecs
 
 #
 # Constants
@@ -56,11 +45,9 @@ META_SCHEMA = avro.schema.parse("""\
    {"name": "meta", "type": {"type": "map", "values": "bytes"}},
    {"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
 """ % (MAGIC_SIZE, SYNC_SIZE))
-VALID_CODECS = ['null', 'deflate']
-if has_snappy:
-    VALID_CODECS.append('snappy')
-if has_zstandard:
-    VALID_CODECS.append('zstandard')
+
+NULL_CODEC = 'null'
+VALID_CODECS = Codecs.supported_codec_names()
 VALID_ENCODINGS = ['binary'] # not used yet
 
 CODEC_KEY = "avro.codec"
@@ -142,7 +129,7 @@ class _DataFile(object):
 class DataFileWriter(_DataFile):
 
   # TODO(hammer): make 'encoder' a metadata property
-  def __init__(self, writer, datum_writer, writers_schema=None, codec='null'):
+  def __init__(self, writer, datum_writer, writers_schema=None, 
codec=NULL_CODEC):
     """
     If the schema is not present, presume we're appending.
 
@@ -215,24 +202,8 @@ class DataFileWriter(_DataFile):
 
       # write block contents
       uncompressed_data = self.buffer_writer.getvalue()
-      codec = self.codec
-      if codec == 'null':
-        compressed_data = uncompressed_data
-        compressed_data_length = len(compressed_data)
-      elif codec == 'deflate':
-        # The first two characters and last character are zlib
-        # wrappers around deflate data.
-        compressed_data = zlib.compress(uncompressed_data)[2:-1]
-        compressed_data_length = len(compressed_data)
-      elif codec == 'snappy':
-        compressed_data = snappy.compress(uncompressed_data)
-        compressed_data_length = len(compressed_data) + 4 # crc32
-      elif codec == 'zstandard':
-        compressed_data = zstd.ZstdCompressor().compress(uncompressed_data)
-        compressed_data_length = len(compressed_data)
-      else:
-        fail_msg = '"%s" codec is not supported.' % self.codec
-        raise DataFileException(fail_msg)
+      codec = Codecs.get_codec(self.codec)
+      compressed_data, compressed_data_length = 
codec.compress(uncompressed_data)
 
       # Write length of block
       self.encoder.write_long(compressed_data_length)
@@ -240,10 +211,6 @@ class DataFileWriter(_DataFile):
       # Write block
       self.writer.write(compressed_data)
 
-      # Write CRC32 checksum for Snappy
-      if codec == 'snappy':
-        self.encoder.write_crc32(uncompressed_data)
-
       # write sync marker
       self.writer.write(self.sync_marker)
 
@@ -345,40 +312,8 @@ class DataFileReader(_DataFile):
 
   def _read_block_header(self):
     self.block_count = self.raw_decoder.read_long()
-    codec = self.codec
-    if codec == "null":
-      # Skip a long; we don't need to use the length.
-      self.raw_decoder.skip_long()
-      self._datum_decoder = self._raw_decoder
-    elif codec == 'deflate':
-      # Compressed data is stored as (length, data), which
-      # corresponds to how the "bytes" type is encoded.
-      data = self.raw_decoder.read_bytes()
-      # -15 is the log of the window size; negative indicates
-      # "raw" (no zlib headers) decompression.  See zlib.h.
-      uncompressed = zlib.decompress(data, -15)
-      self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
-    elif codec == 'snappy':
-      # Compressed data includes a 4-byte CRC32 checksum
-      length = self.raw_decoder.read_long()
-      data = self.raw_decoder.read(length - 4)
-      uncompressed = snappy.decompress(data)
-      self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
-      self.raw_decoder.check_crc32(uncompressed);
-    elif codec == 'zstandard':
-      length = self.raw_decoder.read_long()
-      data = self.raw_decoder.read(length)
-      uncompressed = bytearray()
-      dctx = zstd.ZstdDecompressor()
-      with dctx.stream_reader(io.BytesIO(data)) as reader:
-        while True:
-          chunk = reader.read(16384)
-          if not chunk:
-            break
-          uncompressed.extend(chunk)
-      self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
-    else:
-      raise DataFileException("Unknown codec: %r" % self.codec)
+    codec = Codecs.get_codec(self.codec)
+    self._datum_decoder = codec.decompress(self.raw_decoder)
 
   def _skip_sync(self):
     """
diff --git a/lang/py/avro/io.py b/lang/py/avro/io.py
index 6e9ccdd..52b631a 100644
--- a/lang/py/avro/io.py
+++ b/lang/py/avro/io.py
@@ -47,8 +47,8 @@ import datetime
 import json
 import struct
 import sys
-from binascii import crc32
 from decimal import Decimal, getcontext
+from struct import Struct
 
 from avro import constants, schema, timezones
 
@@ -81,24 +81,11 @@ LONG_MIN_VALUE = -(1 << 63)
 LONG_MAX_VALUE = (1 << 63) - 1
 
 # TODO(hammer): shouldn't ! be < for little-endian (according to spec?)
-if sys.version_info >= (2, 5, 0):
-  struct_class = struct.Struct
-else:
-  class SimpleStruct(object):
-    def __init__(self, format):
-      self.format = format
-    def pack(self, *args):
-      return struct.pack(self.format, *args)
-    def unpack(self, *args):
-      return struct.unpack(self.format, *args)
-  struct_class = SimpleStruct
-
-STRUCT_FLOAT = struct_class('<f')           # big-endian float
-STRUCT_DOUBLE = struct_class('<d')          # big-endian double
-STRUCT_CRC32 = struct_class('>I')           # big-endian unsigned int
-STRUCT_SIGNED_SHORT = struct_class('>h')    # big-endian signed short
-STRUCT_SIGNED_INT = struct_class('>i')      # big-endian signed int
-STRUCT_SIGNED_LONG = struct_class('>q')     # big-endian signed long
+STRUCT_FLOAT = Struct('<f')           # big-endian float
+STRUCT_DOUBLE = Struct('<d')          # big-endian double
+STRUCT_SIGNED_SHORT = Struct('>h')    # big-endian signed short
+STRUCT_SIGNED_INT = Struct('>i')      # big-endian signed int
+STRUCT_SIGNED_LONG = Struct('>q')     # big-endian signed long
 
 
 #
@@ -373,12 +360,6 @@ class BinaryDecoder(object):
     unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, 
tzinfo=timezones.utc)
     return unix_epoch_datetime + timedelta
 
-
-  def check_crc32(self, bytes):
-    checksum = STRUCT_CRC32.unpack(self.read(4))[0];
-    if crc32(bytes) & 0xffffffff != checksum:
-      raise schema.AvroException("Checksum failure")
-
   def skip_null(self):
     pass
 
@@ -551,12 +532,6 @@ class BinaryEncoder(object):
     datum = datum.encode("utf-8")
     self.write_bytes(datum)
 
-  def write_crc32(self, bytes):
-    """
-    A 4-byte, big-endian CRC32 checksum
-    """
-    self.write(STRUCT_CRC32.pack(crc32(bytes) & 0xffffffff));
-
   def write_date_int(self, datum):
     """
     Encode python date object as int.
diff --git a/lang/py/avro/ipc.py b/lang/py/avro/ipc.py
index 1c69063..9f2d67d 100644
--- a/lang/py/avro/ipc.py
+++ b/lang/py/avro/ipc.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import, division, 
print_function
 
 import io
 import os
+from struct import Struct
 
 import avro.io
 from avro import protocol, schema
@@ -64,7 +65,7 @@ SYSTEM_ERROR_SCHEMA = schema.parse('["string"]')
 REMOTE_HASHES = {}
 REMOTE_PROTOCOLS = {}
 
-BIG_ENDIAN_INT_STRUCT = avro.io.struct_class('!I')
+BIG_ENDIAN_INT_STRUCT = Struct('!I')
 BUFFER_HEADER_LENGTH = 4
 BUFFER_SIZE = 8192
 
diff --git a/lang/py/avro/test/gen_interop_data.py 
b/lang/py/avro/test/gen_interop_data.py
index 6f2f428..c4bc65b 100644
--- a/lang/py/avro/test/gen_interop_data.py
+++ b/lang/py/avro/test/gen_interop_data.py
@@ -26,24 +26,15 @@ import sys
 import avro.datafile
 import avro.io
 import avro.schema
+from avro.codecs import Codecs
 
 try:
   unicode
 except NameError:
   unicode = str
 
-CODECS_TO_VALIDATE = ('null', 'deflate')
-
-try:
-  import snappy
-  CODECS_TO_VALIDATE += ('snappy',)
-except ImportError:
-  print('Snappy not present, will skip generating it.')
-try:
-  import zstandard
-  CODECS_TO_VALIDATE += ('zstandard',)
-except ImportError:
-  print('Zstandard not present, will skip generating it.')
+NULL_CODEC = 'null'
+CODECS_TO_VALIDATE = Codecs.supported_codec_names()
 
 DATUM = {
   'intField': 12,
@@ -69,7 +60,7 @@ def generate(schema_path, output_path):
     interop_schema = avro.schema.parse(schema_file.read())
   for codec in CODECS_TO_VALIDATE:
     filename = output_path
-    if codec != 'null':
+    if codec != NULL_CODEC:
       base, ext = os.path.splitext(output_path)
       filename = base + "_" + codec + ext
     with avro.datafile.DataFileWriter(open(filename, 'wb'), 
avro.io.DatumWriter(),
diff --git a/lang/py/avro/test/test_datafile.py 
b/lang/py/avro/test/test_datafile.py
index b23b5b1..dad56ef 100644
--- a/lang/py/avro/test/test_datafile.py
+++ b/lang/py/avro/test/test_datafile.py
@@ -23,6 +23,7 @@ import os
 import unittest
 
 from avro import datafile, io, schema
+from avro.codecs import Codecs
 
 try:
   unicode
@@ -64,17 +65,7 @@ SCHEMAS_TO_VALIDATE = (
 )
 
 FILENAME = 'test_datafile.out'
-CODECS_TO_VALIDATE = ('null', 'deflate')
-try:
-  import snappy
-  CODECS_TO_VALIDATE += ('snappy',)
-except ImportError:
-  print('Snappy not present, will skip testing it.')
-try:
-  import zstandard
-  CODECS_TO_VALIDATE += ('zstandard',)
-except ImportError:
-  print('Zstandard not present, will skip testing it.')
+CODECS_TO_VALIDATE = Codecs.supported_codec_names()
 
 class TestDataFile(unittest.TestCase):
   def test_round_trip(self):

Reply via email to