Repository: incubator-beam Updated Branches: refs/heads/python-sdk 9bc04b750 -> a580b31ac
Add support for ZLIB and DEFLATE compression Code originally contributed by Slaven Bilac. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e1b3ac30 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e1b3ac30 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e1b3ac30 Branch: refs/heads/python-sdk Commit: e1b3ac30b5dd5b14058247fad73b6b235e618094 Parents: 9bc04b7 Author: Robert Bradshaw <[email protected]> Authored: Thu Jul 7 13:40:18 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jul 7 13:40:18 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 168 +++++++++++++++++++++---- sdks/python/apache_beam/io/fileio_test.py | 30 ++++- 2 files changed, 173 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1b3ac30/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 6475a34..31b6a93 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -20,7 +20,6 @@ from __future__ import absolute_import import glob -import gzip import logging from multiprocessing.pool import ThreadPool import os @@ -28,6 +27,7 @@ import re import shutil import tempfile import time +import zlib from apache_beam import coders from apache_beam.io import iobase @@ -269,13 +269,129 @@ class _CompressionType(object): class CompressionTypes(object): """Enum-like class representing known compression types.""" NO_COMPRESSION = _CompressionType(1) # No compression. - DEFLATE = _CompressionType(2) # 'Deflate' ie gzip compression. + DEFLATE = _CompressionType(2) # 'Deflate' compression (without headers). + GZIP = _CompressionType(3) # gzip compression (deflate with gzip headers). + ZLIB = _CompressionType(4) # zlib compression (deflate with zlib headers). @staticmethod - def valid_compression_type(compression_type): + def is_valid_compression_type(compression_type): """Returns true for valid compression types, false otherwise.""" return isinstance(compression_type, _CompressionType) + @staticmethod + def mime_type(compression_type, default='application/octet-stream'): + if compression_type == CompressionTypes.GZIP: + return 'application/x-gzip' + elif compression_type == CompressionTypes.ZLIB: + return 'application/octet-stream' + elif compression_type == CompressionTypes.DEFLATE: + return 'application/octet-stream' + else: + return default + + +class _CompressedFile(object): + """Somewhat limited file wrapper for easier handling of compressed files.""" + _type_mask = { + CompressionTypes.ZLIB: zlib.MAX_WBITS, + CompressionTypes.GZIP: zlib.MAX_WBITS | 16, + CompressionTypes.DEFLATE: -zlib.MAX_WBITS, + } + + def __init__(self, + fileobj=None, + compression_type=CompressionTypes.ZLIB, + read_size=16384): + self._validate_compression_type(compression_type) + if not fileobj: + raise ValueError('fileobj must be opened file but was %s' % fileobj) + + self.fileobj = fileobj + self.data = '' + self.read_size = read_size + self.compression_type = compression_type + if self._readable(): + self.decompressor = self._create_decompressor(self.compression_type) + else: + self.decompressor = None + if self._writeable(): + self.compressor = self._create_compressor(self.compression_type) + else: + self.compressor = None + + def _validate_compression_type(self, compression_type): + if not CompressionTypes.is_valid_compression_type(compression_type): + raise TypeError('compression_type must be CompressionType object but ' + 'was %s' % type(compression_type)) + if compression_type == CompressionTypes.NO_COMPRESSION: + raise ValueError('cannot create object with no compression') + + def _create_compressor(self, compression_type): + self._validate_compression_type(compression_type) + return zlib.compressobj(9, zlib.DEFLATED, + self._type_mask[compression_type]) + + def _create_decompressor(self, compression_type): + self._validate_compression_type(compression_type) + return zlib.decompressobj(self._type_mask[compression_type]) + + def _readable(self): + mode = self.fileobj.mode + return 'r' in mode or 'a' in mode + + def _writeable(self): + mode = self.fileobj.mode + return 'w' in mode or 'a' in mode + + def write(self, data): + """Write data to file.""" + if not self.compressor: + raise ValueError('compressor not initialized') + compressed = self.compressor.compress(data) + if compressed: + self.fileobj.write(compressed) + + def _read(self, num_bytes): + """Read num_bytes into internal buffer.""" + while not num_bytes or len(self.data) < num_bytes: + buf = self.fileobj.read(self.read_size) + if not buf: + # EOF reached, flush. + self.data += self.decompressor.flush() + break + + self.data += self.decompressor.decompress(buf) + result = self.data[:num_bytes] + self.data = self.data[num_bytes:] + return result + + def read(self, num_bytes): + if not self.decompressor: + raise ValueError('decompressor not initialized') + return self._read(num_bytes) + + @property + def closed(self): + return not self.fileobj or self.fileobj.closed() + + def close(self): + if self.fileobj is None: + return + + if self._writeable(): + self.fileobj.write(self.compressor.flush()) + self.fileobj.close() + + def flush(self): + if self._writeable(): + self.fileobj.write(self.compressor.flush()) + self.fileobj.flush() + + # TODO(slaven): Add support for seeking to a file position. + @property + def seekable(self): + return False + class FileSink(iobase.Sink): """A sink to a GCS or local files. @@ -302,7 +418,8 @@ class FileSink(iobase.Sink): file_name_suffix='', num_shards=0, shard_name_template=None, - mime_type='application/octet-stream'): + mime_type='application/octet-stream', + compression_type=CompressionTypes.NO_COMPRESSION): if shard_name_template is None: shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE elif shard_name_template is '': @@ -311,8 +428,12 @@ class FileSink(iobase.Sink): self.file_name_suffix = file_name_suffix self.num_shards = num_shards self.coder = coder - self.mime_type = mime_type self.shard_name_format = self._template_to_format(shard_name_template) + if not CompressionTypes.is_valid_compression_type(compression_type): + raise TypeError('compression_type must be CompressionType object but ' + 'was %s' % type(compression_type)) + self.compression_type = compression_type + self.mime_type = CompressionTypes.mime_type(compression_type, mime_type) def open(self, temp_path): """Opens ``temp_path``, returning an opaque file handle object. @@ -320,7 +441,12 @@ class FileSink(iobase.Sink): The returned file handle is passed to ``write_[encoded_]record`` and ``close``. """ - return ChannelFactory.open(temp_path, 'wb', self.mime_type) + raw_file = ChannelFactory.open(temp_path, 'wb', self.mime_type) + if self.compression_type == CompressionTypes.NO_COMPRESSION: + return raw_file + else: + return _CompressedFile(fileobj=raw_file, + compression_type=self.compression_type) def write_record(self, file_handle, value): """Writes a single record go the file handle returned by ``open()``. @@ -508,36 +634,34 @@ class TextFileSink(FileSink): 'TextFileSink: file_name_suffix must be a string; got %r instead' % file_name_suffix) - if not CompressionTypes.valid_compression_type(compression_type): - raise TypeError('compression_type must be CompressionType object but ' - 'was %s' % type(compression_type)) - if compression_type == CompressionTypes.DEFLATE: - mime_type = 'application/x-gzip' - else: - mime_type = 'text/plain' - super(TextFileSink, self).__init__(file_path_prefix, file_name_suffix=file_name_suffix, num_shards=num_shards, shard_name_template=shard_name_template, coder=coder, - mime_type=mime_type) + mime_type='text/plain', + compression_type=compression_type) self.compression_type = compression_type self.append_trailing_newlines = append_trailing_newlines - def open(self, temp_path): - """Opens ''temp_path'', returning a writeable file object.""" - fobj = ChannelFactory.open(temp_path, 'wb', self.mime_type) - if self.compression_type == CompressionTypes.DEFLATE: - return gzip.GzipFile(fileobj=fobj) - return fobj - def write_encoded_record(self, file_handle, encoded_value): + """Writes a single encoded record.""" file_handle.write(encoded_value) if self.append_trailing_newlines: file_handle.write('\n') + def close(self, file_handle): + """Finalize and close the file handle returned from ``open()``. + + Args: + file_handle: file handle to be closed. + Raises: + ValueError: if file_handle is already closed. + """ + if file_handle is not None: + file_handle.close() + class NativeTextFileSink(iobase.NativeSink): """A sink to a GCS or local text file or files.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1b3ac30/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index b77e1c1..e71ba6d 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -24,6 +24,7 @@ import logging import os import tempfile import unittest +import zlib import apache_beam as beam from apache_beam import coders @@ -347,7 +348,7 @@ class NativeTestTextFileSink(unittest.TestCase): self.assertEqual(f.read().splitlines(), lines) -class TestPureTextFileSink(unittest.TestCase): +class TestTextFileSink(unittest.TestCase): def setUp(self): self.lines = ['Line %d' % d for d in range(100)] @@ -366,14 +367,37 @@ class TestPureTextFileSink(unittest.TestCase): with open(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), self.lines) + def test_write_deflate_file(self): + sink = fileio.TextFileSink(self.path, + compression_type=fileio.CompressionTypes.DEFLATE) + self._write_lines(sink, self.lines) + + with open(self.path, 'r') as f: + content = f.read() + self.assertEqual( + zlib.decompress(content, -zlib.MAX_WBITS).splitlines(), self.lines) + def test_write_gzip_file(self): - sink = fileio.TextFileSink( - self.path, compression_type=fileio.CompressionTypes.DEFLATE) + sink = fileio.TextFileSink(self.path, + compression_type=fileio.CompressionTypes.GZIP) self._write_lines(sink, self.lines) with gzip.GzipFile(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), self.lines) + def test_write_zlib_file(self): + sink = fileio.TextFileSink(self.path, + compression_type=fileio.CompressionTypes.ZLIB) + self._write_lines(sink, self.lines) + + with open(self.path, 'r') as f: + content = f.read() + # Below decompress option should work for both zlib/gzip header + # auto detection. + self.assertEqual( + zlib.decompress(content, zlib.MAX_WBITS | 32).splitlines(), + self.lines) + class MyFileSink(fileio.FileSink):
