Repository: beam Updated Branches: refs/heads/master 645d0bba9 -> e2a2836ad
[BEAM-778] Make filesystem._CompressedFile seekable. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10e5a22b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10e5a22b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10e5a22b Branch: refs/heads/master Commit: 10e5a22b8479bff249c708194e327137458ca2b3 Parents: 645d0bb Author: Tibor Kiss <[email protected]> Authored: Fri Mar 31 07:11:07 2017 +0200 Committer: [email protected] <[email protected]> Committed: Wed Apr 5 09:51:42 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio_test.py | 31 --- sdks/python/apache_beam/io/filesystem.py | 122 ++++++++++-- sdks/python/apache_beam/io/filesystem_test.py | 221 +++++++++++++++++++++ sdks/python/setup.py | 2 +- 4 files changed, 327 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 504a2b9..2409873 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -30,7 +30,6 @@ import hamcrest as hc import apache_beam as beam from apache_beam import coders from apache_beam.io import fileio -from apache_beam.io.filesystem import CompressedFile from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher @@ -70,36 +69,6 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase): return file_name -class TestCompressedFile(_TestCaseWithTempDirCleanUp): - - def test_seekable(self): - readable = CompressedFile(open(self._create_temp_file(), 'r')) - self.assertFalse(readable.seekable) - - writeable = CompressedFile(open(self._create_temp_file(), 'w')) - self.assertFalse(writeable.seekable) - - def test_tell(self): - lines = ['line%d\n' % i for i in range(10)] - tmpfile = self._create_temp_file() - writeable = CompressedFile(open(tmpfile, 'w')) - current_offset = 0 - for line in lines: - writeable.write(line) - current_offset += len(line) - self.assertEqual(current_offset, writeable.tell()) - - writeable.close() - readable = CompressedFile(open(tmpfile)) - current_offset = 0 - while True: - line = readable.readline() - current_offset += len(line) - self.assertEqual(current_offset, readable.tell()) - if not line: - break - - class MyFileSink(fileio.FileSink): def open(self, temp_path): http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index b0d2f48..e6c3c29 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -23,7 +23,10 @@ import bz2 import cStringIO import os import zlib +import logging +import time +logger = logging.getLogger(__name__) DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 @@ -79,7 +82,8 @@ class CompressionTypes(object): class CompressedFile(object): - """Somewhat limited file wrapper for easier handling of compressed files.""" + """File wrapper for easier handling of compressed files.""" + # XXX: This class is not thread safe in the read path. # The bit mask to use for the wbits parameters of the zlib compressor and # decompressor objects. @@ -107,6 +111,7 @@ class CompressedFile(object): raise ValueError('File object must be at position 0 but was %d' % self._file.tell()) self._uncompressed_position = 0 + self._uncompressed_size = None if self.readable(): self._read_size = read_size @@ -114,24 +119,30 @@ class CompressedFile(object): self._read_position = 0 self._read_eof = False - if self._compression_type == CompressionTypes.BZIP2: - self._decompressor = bz2.BZ2Decompressor() - else: - assert self._compression_type == CompressionTypes.GZIP - self._decompressor = zlib.decompressobj(self._gzip_mask) + self._initialize_decompressor() else: self._decompressor = None if self.writeable(): - if self._compression_type == CompressionTypes.BZIP2: - self._compressor = bz2.BZ2Compressor() - else: - assert self._compression_type == CompressionTypes.GZIP - self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, - zlib.DEFLATED, self._gzip_mask) + self._initialize_compressor() else: self._compressor = None + def _initialize_decompressor(self): + if self._compression_type == CompressionTypes.BZIP2: + self._decompressor = bz2.BZ2Decompressor() + else: + assert self._compression_type == CompressionTypes.GZIP + self._decompressor = zlib.decompressobj(self._gzip_mask) + + def _initialize_compressor(self): + if self._compression_type == CompressionTypes.BZIP2: + self._compressor = bz2.BZ2Compressor() + else: + assert self._compression_type == CompressionTypes.GZIP + self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, + zlib.DEFLATED, self._gzip_mask) + def readable(self): mode = self._file.mode return 'r' in mode or 'a' in mode @@ -158,9 +169,7 @@ class CompressedFile(object): # but without dropping any previous held data. self._read_buffer.seek(self._read_position) data = self._read_buffer.read() - self._read_position = 0 - self._read_buffer.seek(0) - self._read_buffer.truncate(0) + self._clear_read_buffer() self._read_buffer.write(data) while not self._read_eof and (self._read_buffer.tell() - self._read_position @@ -250,8 +259,87 @@ class CompressedFile(object): @property def seekable(self): - # TODO: Add support for seeking to a file position. - return False + return self._file.mode == 'r' + + def _clear_read_buffer(self): + """Clears the read buffer by removing all the contents and + resetting _read_position to 0""" + self._read_position = 0 + self._read_buffer.seek(0) + self._read_buffer.truncate(0) + + def _rewind_file(self): + """Seeks to the beginning of the input file. Input file's EOF marker + is cleared and _uncompressed_position is reset to zero""" + self._file.seek(0, os.SEEK_SET) + self._read_eof = False + self._uncompressed_position = 0 + + def _rewind(self): + """Seeks to the beginning of the input file and resets the internal read + buffer. The decompressor object is re-initialized to ensure that no data + left in it's buffer.""" + self._clear_read_buffer() + self._rewind_file() + + # Re-initialize decompressor to clear any data buffered prior to rewind + self._initialize_decompressor() + + def seek(self, offset, whence=os.SEEK_SET): + """Set the file's current offset. + + Seeking behavior: + * seeking from the end (SEEK_END) the whole file is decompressed once to + determine it's size. Therefore it is preferred to use + SEEK_SET or SEEK_CUR to avoid the processing overhead + * seeking backwards from the current position rewinds the file to 0 + and decompresses the chunks to the requested offset + * seeking is only supported in files opened for reading + * if the new offset is out of bound, it is adjusted to either 0 or EOF. + + Args: + offset: seek offset in the uncompressed content represented as number + whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), + os.SEEK_CUR (seek relative to the current position), and os.SEEK_END + (seek relative to the end, offset should be negative). + + Raises: + IOError: When this buffer is closed. + ValueError: When whence is invalid or the file is not seekable + """ + if whence == os.SEEK_SET: + absolute_offset = offset + elif whence == os.SEEK_CUR: + absolute_offset = self._uncompressed_position + offset + elif whence == os.SEEK_END: + # Determine and cache the uncompressed size of the file + if not self._uncompressed_size: + logger.warn("Seeking relative from end of file is requested. " + "Need to decompress the whole file once to determine " + "its size. This might take a while...") + uncompress_start_time = time.time() + while self.read(self._read_size): + pass + uncompress_end_time = time.time() + logger.warn("Full file decompression for seek from end took %.2f secs", + (uncompress_end_time - uncompress_start_time)) + self._uncompressed_size = self._uncompressed_position + absolute_offset = self._uncompressed_size + offset + else: + raise ValueError("Whence mode %r is invalid." % whence) + + # Determine how many bytes needs to be read before we reach + # the requested offset. Rewind if we already passed the position. + if absolute_offset < self._uncompressed_position: + self._rewind() + bytes_to_skip = absolute_offset - self._uncompressed_position + + # Read until the desired position is reached or EOF occurs. + while bytes_to_skip: + data = self.read(min(self._read_size, bytes_to_skip)) + if not data: + break + bytes_to_skip -= len(data) def tell(self): """Returns current position in uncompressed file.""" http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/filesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py new file mode 100644 index 0000000..168925d --- /dev/null +++ b/sdks/python/apache_beam/io/filesystem_test.py @@ -0,0 +1,221 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for filesystem module.""" +import shutil +import os +import unittest +import tempfile +import bz2 +import gzip +from StringIO import StringIO + +from apache_beam.io.filesystem import CompressedFile, CompressionTypes + + +class _TestCaseWithTempDirCleanUp(unittest.TestCase): + """Base class for TestCases that deals with TempDir clean-up. + + Inherited test cases will call self._new_tempdir() to start a temporary dir + which will be deleted at the end of the tests (when tearDown() is called). + """ + + def setUp(self): + self._tempdirs = [] + + def tearDown(self): + for path in self._tempdirs: + if os.path.exists(path): + shutil.rmtree(path) + self._tempdirs = [] + + def _new_tempdir(self): + result = tempfile.mkdtemp() + self._tempdirs.append(result) + return result + + def _create_temp_file(self, name='', suffix=''): + if not name: + name = tempfile.template + file_name = tempfile.NamedTemporaryFile( + delete=False, prefix=name, + dir=self._new_tempdir(), suffix=suffix).name + return file_name + + +class TestCompressedFile(_TestCaseWithTempDirCleanUp): + content = """- the BEAM - +How things really are we would like to know. +Does + Time + flow, is it elastic, or is it +atomized in instants hammered around the + clock's face? ... +- May Swenson""" + + # Keep the read block size small so that we exercise the seek functionality + # in compressed file and not just in the internal buffer + read_block_size = 4 + + def _create_compressed_file(self, compression_type, content, + name='', suffix=''): + file_name = self._create_temp_file(name, suffix) + + if compression_type == CompressionTypes.BZIP2: + compress_factory = bz2.BZ2File + elif compression_type == CompressionTypes.GZIP: + compress_factory = gzip.open + else: + assert False, "Invalid compression type: %s" % compression_type + + with compress_factory(file_name, 'w') as f: + f.write(content) + + return file_name + + def test_seekable_enabled_on_read(self): + readable = CompressedFile(open(self._create_temp_file(), 'r')) + self.assertTrue(readable.seekable) + + def test_seekable_disabled_on_write(self): + writeable = CompressedFile(open(self._create_temp_file(), 'w')) + self.assertFalse(writeable.seekable) + + def test_seekable_disabled_on_append(self): + writeable = CompressedFile(open(self._create_temp_file(), 'a')) + self.assertFalse(writeable.seekable) + + def test_seek_set(self): + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + file_name = self._create_compressed_file(compression_type, self.content) + + compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, + read_size=self.read_block_size) + reference_fd = StringIO(self.content) + + # Note: content (readline) check must come before position (tell) check + # because cStringIO's tell() reports out of bound positions (if we seek + # beyond the file) up until a real read occurs. + # _CompressedFile.tell() always stays within the bounds of the + # uncompressed content. + for seek_position in (-1, 0, 1, + len(self.content)-1, len(self.content), + len(self.content) + 1): + compressed_fd.seek(seek_position, os.SEEK_SET) + reference_fd.seek(seek_position, os.SEEK_SET) + + uncompressed_line = compressed_fd.readline() + reference_line = reference_fd.readline() + self.assertEqual(uncompressed_line, reference_line) + + uncompressed_position = compressed_fd.tell() + reference_position = reference_fd.tell() + self.assertEqual(uncompressed_position, reference_position) + + def test_seek_cur(self): + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + file_name = self._create_compressed_file(compression_type, self.content) + + compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, + read_size=self.read_block_size) + reference_fd = StringIO(self.content) + + # Test out of bound, inbound seeking in both directions + for seek_position in (-1, 0, 1, + len(self.content) / 2, + len(self.content) / 2, + -1 * len(self.content) / 2): + compressed_fd.seek(seek_position, os.SEEK_CUR) + reference_fd.seek(seek_position, os.SEEK_CUR) + + uncompressed_line = compressed_fd.readline() + expected_line = reference_fd.readline() + self.assertEqual(uncompressed_line, expected_line) + + reference_position = reference_fd.tell() + uncompressed_position = compressed_fd.tell() + self.assertEqual(uncompressed_position, reference_position) + + def test_read_from_end_returns_no_data(self): + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + file_name = self._create_compressed_file(compression_type, self.content) + + compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, + read_size=self.read_block_size) + + seek_position = 0 + compressed_fd.seek(seek_position, os.SEEK_END) + + expected_data = '' + uncompressed_data = compressed_fd.read(10) + + self.assertEqual(uncompressed_data, expected_data) + + def test_seek_outside(self): + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + file_name = self._create_compressed_file(compression_type, self.content) + + compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, + read_size=self.read_block_size) + + for whence in (os.SEEK_CUR, os.SEEK_SET, os.SEEK_END): + seek_position = -1 * len(self.content) - 10 + compressed_fd.seek(seek_position, whence) + + expected_position = 0 + uncompressed_position = compressed_fd.tell() + self.assertEqual(uncompressed_position, expected_position) + + seek_position = len(self.content) + 20 + compressed_fd.seek(seek_position, whence) + + expected_position = len(self.content) + uncompressed_position = compressed_fd.tell() + self.assertEqual(uncompressed_position, expected_position) + + def test_read_and_seek_back_to_beginning(self): + for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: + file_name = self._create_compressed_file(compression_type, self.content) + compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, + read_size=self.read_block_size) + + first_pass = compressed_fd.readline() + compressed_fd.seek(0, os.SEEK_SET) + second_pass = compressed_fd.readline() + + self.assertEqual(first_pass, second_pass) + + def test_tell(self): + lines = ['line%d\n' % i for i in range(10)] + tmpfile = self._create_temp_file() + writeable = CompressedFile(open(tmpfile, 'w')) + current_offset = 0 + for line in lines: + writeable.write(line) + current_offset += len(line) + self.assertEqual(current_offset, writeable.tell()) + + writeable.close() + readable = CompressedFile(open(tmpfile)) + current_offset = 0 + while True: + line = readable.readline() + current_offset += len(line) + self.assertEqual(current_offset, readable.tell()) + if not line: + break http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index ee3b5e4..4e1c67d 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -98,7 +98,7 @@ REQUIRED_PACKAGES = [ ] REQUIRED_TEST_PACKAGES = [ - 'pyhamcrest>=1.9,<2.0', + 'pyhamcrest>=1.9,<2.0' ] GCP_REQUIREMENTS = [
