Repository: beam Updated Branches: refs/heads/master 661c06652 -> 0a8ac3528
[BEAM-778] Fix the Compressed file seek tests on windows Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aaae9d77 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aaae9d77 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aaae9d77 Branch: refs/heads/master Commit: aaae9d776e958d6d891c2d2c635d0164f07132a1 Parents: 661c066 Author: Sourabh Bajaj <[email protected]> Authored: Thu Apr 6 16:23:33 2017 -0700 Committer: [email protected] <[email protected]> Committed: Fri Apr 7 11:10:41 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filesystem.py | 2 +- sdks/python/apache_beam/io/filesystem_test.py | 242 ++++++++++----------- 2 files changed, 118 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/aaae9d77/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index e6c3c29..85c7f06 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -259,7 +259,7 @@ class CompressedFile(object): @property def seekable(self): - return self._file.mode == 'r' + return 'r' in self._file.mode def _clear_read_buffer(self): """Clears the read buffer by removing all the contents and http://git-wip-us.apache.org/repos/asf/beam/blob/aaae9d77/sdks/python/apache_beam/io/filesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py index 168925d..607393d 100644 --- a/sdks/python/apache_beam/io/filesystem_test.py +++ b/sdks/python/apache_beam/io/filesystem_test.py @@ -17,48 +17,23 @@ # """Unit tests for filesystem module.""" -import shutil +import bz2 +import gzip import os import unittest import tempfile -import bz2 -import gzip from StringIO import StringIO from apache_beam.io.filesystem import CompressedFile, CompressionTypes -class _TestCaseWithTempDirCleanUp(unittest.TestCase): +class TestCompressedFile(unittest.TestCase): """Base class for TestCases that deals with TempDir clean-up. Inherited test cases will call self._new_tempdir() to start a temporary dir which will be deleted at the end of the tests (when tearDown() is called). """ - def setUp(self): - self._tempdirs = [] - - def tearDown(self): - for path in self._tempdirs: - if os.path.exists(path): - shutil.rmtree(path) - self._tempdirs = [] - - def _new_tempdir(self): - result = tempfile.mkdtemp() - self._tempdirs.append(result) - return result - - def _create_temp_file(self, name='', suffix=''): - if not name: - name = tempfile.template - file_name = tempfile.NamedTemporaryFile( - delete=False, prefix=name, - dir=self._new_tempdir(), suffix=suffix).name - return file_name - - -class TestCompressedFile(_TestCaseWithTempDirCleanUp): content = """- the BEAM - How things really are we would like to know. Does @@ -72,9 +47,21 @@ atomized in instants hammered around the # in compressed file and not just in the internal buffer read_block_size = 4 - def _create_compressed_file(self, compression_type, content, - name='', suffix=''): - file_name = self._create_temp_file(name, suffix) + def setUp(self): + self._tempfiles = [] + + def tearDown(self): + for path in self._tempfiles: + if os.path.exists(path): + os.remove(path) + + def _create_temp_file(self): + path = tempfile.NamedTemporaryFile(delete=False).name + self._tempfiles.append(path) + return path + + def _create_compressed_file(self, compression_type, content): + file_name = self._create_temp_file() if compression_type == CompressionTypes.BZIP2: compress_factory = bz2.BZ2File @@ -83,139 +70,144 @@ atomized in instants hammered around the else: assert False, "Invalid compression type: %s" % compression_type - with compress_factory(file_name, 'w') as f: + with compress_factory(file_name, 'wb') as f: f.write(content) return file_name def test_seekable_enabled_on_read(self): - readable = CompressedFile(open(self._create_temp_file(), 'r')) - self.assertTrue(readable.seekable) + with open(self._create_temp_file(), 'rb') as f: + readable = CompressedFile(f) + self.assertTrue(readable.seekable) def test_seekable_disabled_on_write(self): - writeable = CompressedFile(open(self._create_temp_file(), 'w')) - self.assertFalse(writeable.seekable) + with open(self._create_temp_file(), 'wb') as f: + writeable = CompressedFile(f) + self.assertFalse(writeable.seekable) def test_seekable_disabled_on_append(self): - writeable = CompressedFile(open(self._create_temp_file(), 'a')) - self.assertFalse(writeable.seekable) + with open(self._create_temp_file(), 'ab') as f: + writeable = CompressedFile(f) + self.assertFalse(writeable.seekable) def test_seek_set(self): for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) - - compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, - read_size=self.read_block_size) - reference_fd = StringIO(self.content) - - # Note: content (readline) check must come before position (tell) check - # because cStringIO's tell() reports out of bound positions (if we seek - # beyond the file) up until a real read occurs. - # _CompressedFile.tell() always stays within the bounds of the - # uncompressed content. - for seek_position in (-1, 0, 1, - len(self.content)-1, len(self.content), - len(self.content) + 1): - compressed_fd.seek(seek_position, os.SEEK_SET) - reference_fd.seek(seek_position, os.SEEK_SET) - - uncompressed_line = compressed_fd.readline() - reference_line = reference_fd.readline() - self.assertEqual(uncompressed_line, reference_line) - - uncompressed_position = compressed_fd.tell() - reference_position = reference_fd.tell() - self.assertEqual(uncompressed_position, reference_position) + with open(file_name, 'rb') as f: + compressed_fd = CompressedFile(f, compression_type, + read_size=self.read_block_size) + reference_fd = StringIO(self.content) + + # Note: content (readline) check must come before position (tell) check + # because cStringIO's tell() reports out of bound positions (if we seek + # beyond the file) up until a real read occurs. + # _CompressedFile.tell() always stays within the bounds of the + # uncompressed content. + for seek_position in (-1, 0, 1, + len(self.content)-1, len(self.content), + len(self.content) + 1): + compressed_fd.seek(seek_position, os.SEEK_SET) + reference_fd.seek(seek_position, os.SEEK_SET) + + uncompressed_line = compressed_fd.readline() + reference_line = reference_fd.readline() + self.assertEqual(uncompressed_line, reference_line) + + uncompressed_position = compressed_fd.tell() + reference_position = reference_fd.tell() + self.assertEqual(uncompressed_position, reference_position) def test_seek_cur(self): for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) - - compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, - read_size=self.read_block_size) - reference_fd = StringIO(self.content) - - # Test out of bound, inbound seeking in both directions - for seek_position in (-1, 0, 1, - len(self.content) / 2, - len(self.content) / 2, - -1 * len(self.content) / 2): - compressed_fd.seek(seek_position, os.SEEK_CUR) - reference_fd.seek(seek_position, os.SEEK_CUR) - - uncompressed_line = compressed_fd.readline() - expected_line = reference_fd.readline() - self.assertEqual(uncompressed_line, expected_line) - - reference_position = reference_fd.tell() - uncompressed_position = compressed_fd.tell() - self.assertEqual(uncompressed_position, reference_position) + with open(file_name, 'rb') as f: + compressed_fd = CompressedFile(f, compression_type, + read_size=self.read_block_size) + reference_fd = StringIO(self.content) + + # Test out of bound, inbound seeking in both directions + for seek_position in (-1, 0, 1, + len(self.content) / 2, + len(self.content) / 2, + -1 * len(self.content) / 2): + compressed_fd.seek(seek_position, os.SEEK_CUR) + reference_fd.seek(seek_position, os.SEEK_CUR) + + uncompressed_line = compressed_fd.readline() + expected_line = reference_fd.readline() + self.assertEqual(uncompressed_line, expected_line) + + reference_position = reference_fd.tell() + uncompressed_position = compressed_fd.tell() + self.assertEqual(uncompressed_position, reference_position) def test_read_from_end_returns_no_data(self): for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) + with open(file_name, 'rb') as f: + compressed_fd = CompressedFile(f, compression_type, + read_size=self.read_block_size) - compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, - read_size=self.read_block_size) - - seek_position = 0 - compressed_fd.seek(seek_position, os.SEEK_END) + seek_position = 0 + compressed_fd.seek(seek_position, os.SEEK_END) - expected_data = '' - uncompressed_data = compressed_fd.read(10) + expected_data = '' + uncompressed_data = compressed_fd.read(10) - self.assertEqual(uncompressed_data, expected_data) + self.assertEqual(uncompressed_data, expected_data) def test_seek_outside(self): for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) + with open(file_name, 'rb') as f: + compressed_fd = CompressedFile(f, compression_type, + read_size=self.read_block_size) - compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, - read_size=self.read_block_size) - - for whence in (os.SEEK_CUR, os.SEEK_SET, os.SEEK_END): - seek_position = -1 * len(self.content) - 10 - compressed_fd.seek(seek_position, whence) + for whence in (os.SEEK_CUR, os.SEEK_SET, os.SEEK_END): + seek_position = -1 * len(self.content) - 10 + compressed_fd.seek(seek_position, whence) - expected_position = 0 - uncompressed_position = compressed_fd.tell() - self.assertEqual(uncompressed_position, expected_position) + expected_position = 0 + uncompressed_position = compressed_fd.tell() + self.assertEqual(uncompressed_position, expected_position) - seek_position = len(self.content) + 20 - compressed_fd.seek(seek_position, whence) + seek_position = len(self.content) + 20 + compressed_fd.seek(seek_position, whence) - expected_position = len(self.content) - uncompressed_position = compressed_fd.tell() - self.assertEqual(uncompressed_position, expected_position) + expected_position = len(self.content) + uncompressed_position = compressed_fd.tell() + self.assertEqual(uncompressed_position, expected_position) def test_read_and_seek_back_to_beginning(self): for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]: file_name = self._create_compressed_file(compression_type, self.content) - compressed_fd = CompressedFile(open(file_name, 'r'), compression_type, - read_size=self.read_block_size) + with open(file_name, 'rb') as f: + compressed_fd = CompressedFile(f, compression_type, + read_size=self.read_block_size) - first_pass = compressed_fd.readline() - compressed_fd.seek(0, os.SEEK_SET) - second_pass = compressed_fd.readline() + first_pass = compressed_fd.readline() + compressed_fd.seek(0, os.SEEK_SET) + second_pass = compressed_fd.readline() - self.assertEqual(first_pass, second_pass) + self.assertEqual(first_pass, second_pass) def test_tell(self): lines = ['line%d\n' % i for i in range(10)] tmpfile = self._create_temp_file() - writeable = CompressedFile(open(tmpfile, 'w')) - current_offset = 0 - for line in lines: - writeable.write(line) - current_offset += len(line) - self.assertEqual(current_offset, writeable.tell()) - - writeable.close() - readable = CompressedFile(open(tmpfile)) - current_offset = 0 - while True: - line = readable.readline() - current_offset += len(line) - self.assertEqual(current_offset, readable.tell()) - if not line: - break + with open(tmpfile, 'w') as f: + writeable = CompressedFile(f) + current_offset = 0 + for line in lines: + writeable.write(line) + current_offset += len(line) + self.assertEqual(current_offset, writeable.tell()) + + with open(tmpfile) as f: + readable = CompressedFile(f) + current_offset = 0 + while True: + line = readable.readline() + current_offset += len(line) + self.assertEqual(current_offset, readable.tell()) + if not line: + break
