This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 045f9ea [BEAM-6211] Support deflate (zlib) in CompressedFile new 32ca551a Merge pull request #7253: [BEAM-6211] Support deflate (zlib) in CompressedFile 045f9ea is described below commit 045f9ea0564fd67b2c038d720de440640fdb1677 Author: Brian Martin <brianmar...@gmail.com> AuthorDate: Tue Dec 11 11:39:14 2018 -0500 [BEAM-6211] Support deflate (zlib) in CompressedFile `.deflate` files are quite common in Hadoop and also supported by TensorFlow in TFRecord file format. Moreover, `.deflate` is already supported since 0.6.0 by the Java SDK (see BEAM-1518). --- sdks/python/apache_beam/io/filesystem.py | 32 ++++++-- sdks/python/apache_beam/io/filesystem_test.py | 33 +++++--- sdks/python/apache_beam/io/textio_test.py | 113 ++++++++++++++++++++++++++ sdks/python/apache_beam/io/tfrecordio_test.py | 32 ++++++++ 4 files changed, 191 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 7dd3a5b..015af8b 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -58,12 +58,16 @@ class CompressionTypes(object): # The following extensions are currently recognized by auto-detection: # .bz2 (implies BZIP2 as described below). # .gz (implies GZIP as described below) + # .deflate (implies DEFLATE as described below) # Any non-recognized extension implies UNCOMPRESSED as described below. AUTO = 'auto' # BZIP2 compression. BZIP2 = 'bzip2' + # DEFLATE compression + DEFLATE = 'deflate' + # GZIP compression (deflate with GZIP headers). GZIP = 'gzip' @@ -76,6 +80,7 @@ class CompressionTypes(object): types = set([ CompressionTypes.AUTO, CompressionTypes.BZIP2, + CompressionTypes.DEFLATE, CompressionTypes.GZIP, CompressionTypes.UNCOMPRESSED ]) @@ -85,6 +90,7 @@ class CompressionTypes(object): def mime_type(cls, compression_type, default='application/octet-stream'): mime_types_by_compression_type = { cls.BZIP2: 'application/x-bz2', + cls.DEFLATE: 'application/x-deflate', cls.GZIP: 'application/x-gzip', } return mime_types_by_compression_type.get(compression_type, default) @@ -92,7 +98,8 @@ class CompressionTypes(object): @classmethod def detect_compression_type(cls, file_path): """Returns the compression type of a file (based on its suffix).""" - compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP} + compression_types_by_suffix = {'.bz2': cls.BZIP2, '.deflate': cls.DEFLATE, + '.gz': cls.GZIP} lowercased_path = file_path.lower() for suffix, compression_type in compression_types_by_suffix.items(): if lowercased_path.endswith(suffix): @@ -150,6 +157,8 @@ class CompressedFile(object): def _initialize_decompressor(self): if self._compression_type == CompressionTypes.BZIP2: self._decompressor = bz2.BZ2Decompressor() + elif self._compression_type == CompressionTypes.DEFLATE: + self._decompressor = zlib.decompressobj() else: assert self._compression_type == CompressionTypes.GZIP self._decompressor = zlib.decompressobj(self._gzip_mask) @@ -157,6 +166,9 @@ class CompressedFile(object): def _initialize_compressor(self): if self._compression_type == CompressionTypes.BZIP2: self._compressor = bz2.BZ2Compressor() + elif self._compression_type == CompressionTypes.DEFLATE: + self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, + zlib.DEFLATED) else: assert self._compression_type == CompressionTypes.GZIP self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, @@ -208,19 +220,25 @@ class CompressedFile(object): # file. We read concatenated files by recursively creating decompressor # objects for the unused compressed data. if (self._compression_type == CompressionTypes.BZIP2 or + self._compression_type == CompressionTypes.DEFLATE or self._compression_type == CompressionTypes.GZIP): if self._decompressor.unused_data != b'': buf = self._decompressor.unused_data - self._decompressor = ( - bz2.BZ2Decompressor() - if self._compression_type == CompressionTypes.BZIP2 - else zlib.decompressobj(self._gzip_mask)) + + if self._compression_type == CompressionTypes.BZIP2: + self._decompressor = bz2.BZ2Decompressor() + elif self._compression_type == CompressionTypes.DEFLATE: + self._decompressor = zlib.decompressobj() + else: + self._decompressor = zlib.decompressobj(self._gzip_mask) + decompressed = self._decompressor.decompress(buf) self._read_buffer.write(decompressed) continue else: - # Gzip and bzip2 formats do not require flushing remaining data in the - # decompressor into the read buffer when fully decompressing files. + # Deflate, Gzip and bzip2 formats do not require flushing + # remaining data in the decompressor into the read buffer when + # fully decompressing files. self._read_buffer.write(self._decompressor.flush()) # Record that we have hit the end of file, so we won't unnecessarily diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py index abbadde..b26d79d 100644 --- a/sdks/python/apache_beam/io/filesystem_test.py +++ b/sdks/python/apache_beam/io/filesystem_test.py @@ -29,6 +29,7 @@ import posixpath import sys import tempfile import unittest +import zlib from builtins import range from io import BytesIO @@ -294,16 +295,19 @@ atomized in instants hammered around the def _create_compressed_file(self, compression_type, content): file_name = self._create_temp_file() - if compression_type == CompressionTypes.BZIP2: - compress_factory = bz2.BZ2File - elif compression_type == CompressionTypes.GZIP: - compress_factory = gzip.open + if compression_type == CompressionTypes.DEFLATE: + with open(file_name, 'wb') as f: + f.write(zlib.compress(content)) + elif compression_type == CompressionTypes.BZIP2 or \ + compression_type == CompressionTypes.GZIP: + compress_open = bz2.BZ2File \ + if compression_type == CompressionTypes.BZIP2 \ + else gzip.open + with compress_open(file_name, 'wb') as f: + f.write(content) else: assert False, "Invalid compression type: %s" % compression_type - with compress_factory(file_name, 'wb') as f: - f.write(content) - return file_name def test_seekable_enabled_on_read(self): @@ -322,7 +326,8 @@ atomized in instants hammered around the self.assertFalse(writeable.seekable) def test_seek_set(self): - for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, + CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile(f, compression_type, @@ -352,7 +357,8 @@ atomized in instants hammered around the self.assertEqual(uncompressed_position, reference_position) def test_seek_cur(self): - for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, + CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile(f, compression_type, @@ -382,7 +388,8 @@ atomized in instants hammered around the self.assertEqual(uncompressed_position, reference_position) def test_read_from_end_returns_no_data(self): - for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, + CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile(f, compression_type, @@ -397,7 +404,8 @@ atomized in instants hammered around the self.assertEqual(uncompressed_data, expected_data) def test_seek_outside(self): - for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, + CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile(f, compression_type, @@ -419,7 +427,8 @@ atomized in instants hammered around the self.assertEqual(uncompressed_position, expected_position) def test_read_and_seek_back_to_beginning(self): - for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, + CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile(f, compression_type, diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index d3e32c8..780107a 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -29,6 +29,7 @@ import shutil import sys import tempfile import unittest +import zlib from builtins import range import apache_beam as beam @@ -554,6 +555,18 @@ class TextSourceTest(unittest.TestCase): assert_that(pcoll, equal_to(lines)) pipeline.run() + def test_read_auto_deflate(self): + _, lines = write_data(15) + with TempDir() as tempdir: + file_name = tempdir.create_temp_file(suffix='.deflate') + with open(file_name, 'wb') as f: + f.write(zlib.compress('\n'.join(lines).encode('utf-8'))) + + pipeline = TestPipeline() + pcoll = pipeline | 'Read' >> ReadFromText(file_name) + assert_that(pcoll, equal_to(lines)) + pipeline.run() + def test_read_auto_gzip(self): _, lines = write_data(15) with TempDir() as tempdir: @@ -641,6 +654,82 @@ class TextSourceTest(unittest.TestCase): assert_that(lines, equal_to(expected)) pipeline.run() + def test_read_deflate(self): + _, lines = write_data(15) + with TempDir() as tempdir: + file_name = tempdir.create_temp_file() + with open(file_name, 'wb') as f: + f.write(zlib.compress('\n'.join(lines).encode('utf-8'))) + + pipeline = TestPipeline() + pcoll = pipeline | 'Read' >> ReadFromText( + file_name, + 0, CompressionTypes.DEFLATE, + True, coders.StrUtf8Coder()) + assert_that(pcoll, equal_to(lines)) + pipeline.run() + + def test_read_corrupted_deflate_fails(self): + _, lines = write_data(15) + with TempDir() as tempdir: + file_name = tempdir.create_temp_file() + with open(file_name, 'wb') as f: + f.write(zlib.compress('\n'.join(lines).encode('utf-8'))) + + with open(file_name, 'wb') as f: + f.write(b'corrupt') + + pipeline = TestPipeline() + pcoll = pipeline | 'Read' >> ReadFromText( + file_name, + 0, CompressionTypes.DEFLATE, + True, coders.StrUtf8Coder()) + assert_that(pcoll, equal_to(lines)) + + with self.assertRaises(Exception): + pipeline.run() + + def test_read_deflate_concat(self): + with TempDir() as tempdir: + deflate_file_name1 = tempdir.create_temp_file() + lines = ['a', 'b', 'c'] + with open(deflate_file_name1, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(zlib.compress(data.encode('utf-8'))) + + deflate_file_name2 = tempdir.create_temp_file() + lines = ['p', 'q', 'r'] + with open(deflate_file_name2, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(zlib.compress(data.encode('utf-8'))) + + deflate_file_name3 = tempdir.create_temp_file() + lines = ['x', 'y', 'z'] + with open(deflate_file_name3, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(zlib.compress(data.encode('utf-8'))) + + final_deflate_file = tempdir.create_temp_file() + with open(deflate_file_name1, 'rb') as src, \ + open(final_deflate_file, 'wb') as dst: + dst.writelines(src.readlines()) + + with open(deflate_file_name2, 'rb') as src, \ + open(final_deflate_file, 'ab') as dst: + dst.writelines(src.readlines()) + + with open(deflate_file_name3, 'rb') as src, \ + open(final_deflate_file, 'ab') as dst: + dst.writelines(src.readlines()) + + pipeline = TestPipeline() + lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText( + final_deflate_file, + compression_type=beam.io.filesystem.CompressionTypes.DEFLATE) + + expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'] + assert_that(lines, equal_to(expected)) + def test_read_gzip(self): _, lines = write_data(15) with TempDir() as tempdir: @@ -975,6 +1064,30 @@ class TextSinkTest(unittest.TestCase): with gzip.GzipFile(self.path, 'rb') as f: self.assertEqual(f.read().splitlines(), []) + def test_write_deflate_file(self): + sink = TextSink( + self.path, compression_type=CompressionTypes.DEFLATE) + self._write_lines(sink, self.lines) + + with open(self.path, 'rb') as f: + self.assertEqual(zlib.decompress(f.read()).splitlines(), self.lines) + + def test_write_deflate_file_auto(self): + self.path = self._create_temp_file(suffix='.deflate') + sink = TextSink(self.path) + self._write_lines(sink, self.lines) + + with open(self.path, 'rb') as f: + self.assertEqual(zlib.decompress(f.read()).splitlines(), self.lines) + + def test_write_deflate_file_empty(self): + sink = TextSink( + self.path, compression_type=CompressionTypes.DEFLATE) + self._write_lines(sink, []) + + with open(self.path, 'rb') as f: + self.assertEqual(zlib.decompress(f.read()).splitlines(), []) + def test_write_text_file_with_header(self): header = b'header1\nheader2' sink = TextSink(self.path, header=header) diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index 49956ea..f003c34 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -28,6 +28,7 @@ import random import re import sys import unittest +import zlib from builtins import range import crcmod @@ -76,6 +77,12 @@ def _write_file(path, base64_records): f.write(record) +def _write_file_deflate(path, base64_records): + record = binascii.a2b_base64(base64_records) + with open(path, 'wb') as f: + f.write(zlib.compress(record)) + + def _write_file_gzip(path, base64_records): record = binascii.a2b_base64(base64_records) with gzip.GzipFile(path, 'wb') as f: @@ -266,6 +273,19 @@ class TestReadFromTFRecord(unittest.TestCase): validate=True)) assert_that(result, equal_to([b'foo', b'bar'])) + def test_process_deflate(self): + with TempDir() as temp_dir: + path = temp_dir.create_temp_file('result') + _write_file_deflate(path, FOO_BAR_RECORD_BASE64) + with TestPipeline() as p: + result = (p + | ReadFromTFRecord( + path, + coder=coders.BytesCoder(), + compression_type=CompressionTypes.DEFLATE, + validate=True)) + assert_that(result, equal_to([b'foo', b'bar'])) + def test_process_gzip(self): with TempDir() as temp_dir: path = temp_dir.create_temp_file('result') @@ -372,6 +392,18 @@ class TestReadAllFromTFRecord(unittest.TestCase): compression_type=CompressionTypes.AUTO)) assert_that(result, equal_to([b'foo', b'bar'] * 9)) + def test_process_deflate(self): + with TempDir() as temp_dir: + path = temp_dir.create_temp_file('result') + _write_file_deflate(path, FOO_BAR_RECORD_BASE64) + with TestPipeline() as p: + result = (p + | Create([path]) + | ReadAllFromTFRecord( + coder=coders.BytesCoder(), + compression_type=CompressionTypes.DEFLATE)) + assert_that(result, equal_to([b'foo', b'bar'])) + def test_process_gzip(self): with TempDir() as temp_dir: path = temp_dir.create_temp_file('result')