Repository: beam Updated Branches: refs/heads/python-sdk 36a7d3491 -> 678da7a8b
Moving from a string-based buffer to a cStringIO based on in order to help with performance of compressed sources. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee09668b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee09668b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee09668b Branch: refs/heads/python-sdk Commit: ee09668becaed21bfb15bac267a2df5e166bd6b9 Parents: 36a7d34 Author: Gus Katsiapis <[email protected]> Authored: Fri Jan 13 18:28:12 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jan 18 13:43:20 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 124 ++++++++++++++++++------------ sdks/python/apache_beam/io/gcsio.py | 4 +- 2 files changed, 76 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee09668b/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 6ea7844..ebc4fed 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -19,6 +19,7 @@ from __future__ import absolute_import import bz2 +import cStringIO import glob import logging from multiprocessing.pool import ThreadPool @@ -68,6 +69,7 @@ class _CompressionType(object): class CompressionTypes(object): """Enum-like class representing known compression types.""" + # Detect compression based on filename extension. # # The following extensions are currently recognized by auto-detection: @@ -642,9 +644,9 @@ class ChannelFactory(object): class _CompressedFile(object): """Somewhat limited file wrapper for easier handling of compressed files.""" - # The bit mask to use for the wbits parameters of the GZIP compressor and + # The bit mask to use for the wbits parameters of the zlib compressor and # decompressor objects. - _gzip_mask = zlib.MAX_WBITS | 16 + _gzip_mask = zlib.MAX_WBITS | 16 # Mask when using GZIP headers. def __init__(self, fileobj, @@ -652,49 +654,47 @@ class _CompressedFile(object): read_size=gcsio.DEFAULT_READ_BUFFER_SIZE): if not fileobj: raise ValueError('fileobj must be opened file but was %s' % fileobj) - self._validate_compression_type(compression_type) + + if not CompressionTypes.is_valid_compression_type(compression_type): + raise TypeError('compression_type must be CompressionType object but ' + 'was %s' % type(compression_type)) + if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED + ): + raise ValueError( + 'Cannot create object with unspecified or no compression') self._file = fileobj - self._data = '' - self._read_size = read_size self._compression_type = compression_type - if self._readable(): + if self.readable(): + self._read_size = read_size + self._read_buffer = cStringIO.StringIO() + self._read_position = 0 + self._read_eof = False + if self._compression_type == CompressionTypes.BZIP2: self._decompressor = bz2.BZ2Decompressor() else: + assert self._compression_type == CompressionTypes.GZIP self._decompressor = zlib.decompressobj(self._gzip_mask) - self._read_eof = False else: self._decompressor = None - if self._writeable(): + if self.writeable(): if self._compression_type == CompressionTypes.BZIP2: self._compressor = bz2.BZ2Compressor() else: + assert self._compression_type == CompressionTypes.GZIP self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, self._gzip_mask) else: self._compressor = None - def _validate_compression_type(self, compression_type): - if not CompressionTypes.is_valid_compression_type(compression_type): - raise TypeError('compression_type must be CompressionType object but ' - 'was %s' % type(compression_type)) - if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED - ): - raise ValueError( - 'Cannot create object with unspecified or no compression') - if compression_type not in (CompressionTypes.BZIP2, CompressionTypes.GZIP): - raise ValueError( - 'compression_type %s not supported for whole-file compression', - compression_type) - - def _readable(self): + def readable(self): mode = self._file.mode return 'r' in mode or 'a' in mode - def _writeable(self): + def writeable(self): mode = self._file.mode return 'w' in mode or 'a' in mode @@ -708,10 +708,27 @@ class _CompressedFile(object): def _fetch_to_internal_buffer(self, num_bytes): """Fetch up to num_bytes into the internal buffer.""" - while not self._read_eof and len(self._data) < num_bytes: + if (not self._read_eof and self._read_position > 0 and + (self._read_buffer.tell() - self._read_position) < num_bytes): + # There aren't enough number of bytes to accommodate a read, so we + # prepare for a possibly large read by clearing up all internal buffers + # but without dropping any previous held data. + self._read_buffer.seek(self._read_position) + data = self._read_buffer.read() + self._read_position = 0 + self._read_buffer.seek(0) + self._read_buffer.truncate(0) + self._read_buffer.write(data) + + while not self._read_eof and (self._read_buffer.tell() - self._read_position + ) < num_bytes: + # Continue reading from the underlying file object until enough bytes are + # available, or EOF is reached. buf = self._file.read(self._read_size) if buf: - self._data += self._decompressor.decompress(buf) + decompressed = self._decompressor.decompress(buf) + del buf # Free up some possibly large and no-longer-needed memory. + self._read_buffer.write(decompressed) else: # EOF reached. # Verify completeness and no corruption and flush (if needed by @@ -728,62 +745,67 @@ class _CompressedFile(object): except EOFError: pass # All is as expected! else: - self._data += self._decompressor.flush() + self._read_buffer.write(self._decompressor.flush()) + # Record that we have hit the end of file, so we won't unnecessarily # repeat the completeness verification step above. self._read_eof = True - return - def _read_from_internal_buffer(self, num_bytes): - """Read up to num_bytes from the internal buffer.""" - # TODO: this can be optimized to avoid a string copy operation. - result = self._data[:num_bytes] - self._data = self._data[num_bytes:] + def _read_from_internal_buffer(self, read_fn): + """Read from the internal buffer by using the supplied read_fn.""" + self._read_buffer.seek(self._read_position) + result = read_fn() + self._read_position += len(result) + self._read_buffer.seek(0, os.SEEK_END) # Allow future writes. return result def read(self, num_bytes): if not self._decompressor: raise ValueError('decompressor not initialized') + self._fetch_to_internal_buffer(num_bytes) - return self._read_from_internal_buffer(num_bytes) + return self._read_from_internal_buffer( + lambda: self._read_buffer.read(num_bytes)) def readline(self): """Equivalent to standard file.readline(). Same return conventions apply.""" if not self._decompressor: raise ValueError('decompressor not initialized') - result = '' + + io = cStringIO.StringIO() while True: - self._fetch_to_internal_buffer(self._read_size) - if not self._data: - break # EOF reached. - index = self._data.find('\n') - if index == -1: - result += self._read_from_internal_buffer(len(self._data)) - else: - result += self._read_from_internal_buffer(index + 1) - break # Newline reached. - return result + # Ensure that the internal buffer has at least half the read_size. Going + # with half the _read_size (as opposed to a full _read_size) to ensure + # that actual fetches are more evenly spread out, as opposed to having 2 + # consecutive reads at the beginning of a read. + self._fetch_to_internal_buffer(self._read_size / 2) + line = self._read_from_internal_buffer( + lambda: self._read_buffer.readline()) + io.write(line) + if line.endswith('\n') or not line: + break # Newline or EOF reached. + + return io.getvalue() - @property def closed(self): return not self._file or self._file.closed() def close(self): - if self._file is None: - return + if self.readable(): + self._read_buffer.close() - if self._writeable(): + if self.writeable(): self._file.write(self._compressor.flush()) + self._file.close() def flush(self): - if self._writeable(): + if self.writeable(): self._file.write(self._compressor.flush()) self._file.flush() - # TODO: Add support for seeking to a file position. - @property def seekable(self): + # TODO: Add support for seeking to a file position. return False def __enter__(self): http://git-wip-us.apache.org/repos/asf/beam/blob/ee09668b/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index d1fac66..0d18cc0 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -385,6 +385,7 @@ class GcsIO(object): return file_sizes +# TODO: Consider using cStringIO instead of buffers and data_lists when reading. class GcsBufferedReader(object): """A class for reading Google Cloud Storage files.""" @@ -398,7 +399,6 @@ class GcsBufferedReader(object): self.bucket, self.name = parse_gcs_path(path) self.mode = mode self.buffer_size = buffer_size - self.mode = mode # Get object state. get_request = (storage.StorageObjectsGetRequest( @@ -627,6 +627,8 @@ class GcsBufferedReader(object): return False +# TODO: Consider using cStringIO instead of buffers and data_lists when reading +# and writing. class GcsBufferedWriter(object): """A class for writing Google Cloud Storage files."""
