Repository: beam Updated Branches: refs/heads/master 55bb423d8 -> d4f9e9268
Adds support for reading concatenated bzip2 files. Adds tests for concatenated gzip and bzip2 files. Removes test 'test_model_textio_gzip_concatenated' in 'snippets_test.py' since it's actually hitting 'DummyReadTransform' and not testing this feature. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d462439 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d462439 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d462439 Branch: refs/heads/master Commit: 5d46243992948ab6d4c9436e353989b49186354b Parents: 55bb423 Author: [email protected] <[email protected]> Authored: Wed Aug 2 22:49:33 2017 -0700 Committer: [email protected] <[email protected]> Committed: Thu Aug 3 11:05:06 2017 -0700 ---------------------------------------------------------------------- .../examples/snippets/snippets_test.py | 16 --- sdks/python/apache_beam/io/filesystem.py | 31 +++-- sdks/python/apache_beam/io/textio_test.py | 115 +++++++++++++++++++ 3 files changed, 129 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5d462439/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 31f71b3..9183d0d 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -589,22 +589,6 @@ class SnippetsTest(unittest.TestCase): snippets.model_textio_compressed( {'read': gzip_file_name}, ['aa', 'bb', 'cc']) - def test_model_textio_gzip_concatenated(self): - temp_path_1 = self.create_temp_file('a\nb\nc\n') - temp_path_2 = self.create_temp_file('p\nq\nr\n') - temp_path_3 = self.create_temp_file('x\ny\nz') - gzip_file_name = temp_path_1 + '.gz' - with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst: - dst.writelines(src) - with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst: - dst.writelines(src) - with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst: - dst.writelines(src) - # Add the temporary gzip file to be cleaned up as well. - self.temp_files.append(gzip_file_name) - snippets.model_textio_compressed( - {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']) - @unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed') def test_model_datastoreio(self): # We cannot test datastoreio functionality in unit tests therefore we limit http://git-wip-us.apache.org/repos/asf/beam/blob/5d462439/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 1f65d0a..ef3040c 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -187,29 +187,26 @@ class CompressedFile(object): del buf # Free up some possibly large and no-longer-needed memory. self._read_buffer.write(decompressed) else: - # EOF reached. - # Verify completeness and no corruption and flush (if needed by - # the underlying algorithm). - if self._compression_type == CompressionTypes.BZIP2: - # Having unused_data past end of stream would imply file corruption. - assert not self._decompressor.unused_data, 'Possible file corruption.' - try: - # EOF implies that the underlying BZIP2 stream must also have - # reached EOF. We expect this to raise an EOFError and we catch it - # below. Any other kind of error though would be problematic. - self._decompressor.decompress('dummy') - assert False, 'Possible file corruption.' - except EOFError: - pass # All is as expected! - elif self._compression_type == CompressionTypes.GZIP: - # If Gzip file check if there is unused data generated by gzip concat + # EOF of current stream reached. + # + # Any uncompressed data at the end of the stream of a gzip or bzip2 + # file that is not corrupted points to a concatenated compressed + # file. We read concatenated files by recursively creating decompressor + # objects for the unused compressed data. + if (self._compression_type == CompressionTypes.BZIP2 or + self._compression_type == CompressionTypes.GZIP): if self._decompressor.unused_data != '': buf = self._decompressor.unused_data - self._decompressor = zlib.decompressobj(self._gzip_mask) + self._decompressor = ( + bz2.BZ2Decompressor() + if self._compression_type == CompressionTypes.BZIP2 + else zlib.decompressobj(self._gzip_mask)) decompressed = self._decompressor.decompress(buf) self._read_buffer.write(decompressed) continue else: + # Gzip and bzip2 formats do not require flushing remaining data in the + # decompressor into the read buffer when fully decompressing files. self._read_buffer.write(self._decompressor.flush()) # Record that we have hit the end of file, so we won't unnecessarily http://git-wip-us.apache.org/repos/asf/beam/blob/5d462439/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 9a4ec47..8bd7116 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -401,6 +401,64 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp): assert_that(pcoll, equal_to(lines)) pipeline.run() + def test_read_corrupted_bzip2_fails(self): + _, lines = write_data(15) + file_name = self._create_temp_file() + with bz2.BZ2File(file_name, 'wb') as f: + f.write('\n'.join(lines)) + + with open(file_name, 'wb') as f: + f.write('corrupt') + + pipeline = TestPipeline() + pcoll = pipeline | 'Read' >> ReadFromText( + file_name, + compression_type=CompressionTypes.BZIP2) + assert_that(pcoll, equal_to(lines)) + with self.assertRaises(Exception): + pipeline.run() + + def test_read_bzip2_concat(self): + bzip2_file_name1 = self._create_temp_file() + lines = ['a', 'b', 'c'] + with bz2.BZ2File(bzip2_file_name1, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(data) + + bzip2_file_name2 = self._create_temp_file() + lines = ['p', 'q', 'r'] + with bz2.BZ2File(bzip2_file_name2, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(data) + + bzip2_file_name3 = self._create_temp_file() + lines = ['x', 'y', 'z'] + with bz2.BZ2File(bzip2_file_name3, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(data) + + final_bzip2_file = self._create_temp_file() + with open(bzip2_file_name1, 'rb') as src, open( + final_bzip2_file, 'wb') as dst: + dst.writelines(src.readlines()) + + with open(bzip2_file_name2, 'rb') as src, open( + final_bzip2_file, 'ab') as dst: + dst.writelines(src.readlines()) + + with open(bzip2_file_name3, 'rb') as src, open( + final_bzip2_file, 'ab') as dst: + dst.writelines(src.readlines()) + + pipeline = TestPipeline() + lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText( + final_bzip2_file, + compression_type=beam.io.filesystem.CompressionTypes.BZIP2) + + expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'] + assert_that(lines, equal_to(expected)) + pipeline.run() + def test_read_gzip(self): _, lines = write_data(15) file_name = self._create_temp_file() @@ -415,6 +473,63 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp): assert_that(pcoll, equal_to(lines)) pipeline.run() + def test_read_corrupted_gzip_fails(self): + _, lines = write_data(15) + file_name = self._create_temp_file() + with gzip.GzipFile(file_name, 'wb') as f: + f.write('\n'.join(lines)) + + with open(file_name, 'wb') as f: + f.write('corrupt') + + pipeline = TestPipeline() + pcoll = pipeline | 'Read' >> ReadFromText( + file_name, + 0, CompressionTypes.GZIP, + True, coders.StrUtf8Coder()) + assert_that(pcoll, equal_to(lines)) + + with self.assertRaises(Exception): + pipeline.run() + + def test_read_gzip_concat(self): + gzip_file_name1 = self._create_temp_file() + lines = ['a', 'b', 'c'] + with gzip.open(gzip_file_name1, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(data) + + gzip_file_name2 = self._create_temp_file() + lines = ['p', 'q', 'r'] + with gzip.open(gzip_file_name2, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(data) + + gzip_file_name3 = self._create_temp_file() + lines = ['x', 'y', 'z'] + with gzip.open(gzip_file_name3, 'wb') as dst: + data = '\n'.join(lines) + '\n' + dst.write(data) + + final_gzip_file = self._create_temp_file() + with open(gzip_file_name1, 'rb') as src, open(final_gzip_file, 'wb') as dst: + dst.writelines(src.readlines()) + + with open(gzip_file_name2, 'rb') as src, open(final_gzip_file, 'ab') as dst: + dst.writelines(src.readlines()) + + with open(gzip_file_name3, 'rb') as src, open(final_gzip_file, 'ab') as dst: + dst.writelines(src.readlines()) + + pipeline = TestPipeline() + lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText( + final_gzip_file, + compression_type=beam.io.filesystem.CompressionTypes.GZIP) + + expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'] + assert_that(lines, equal_to(expected)) + pipeline.run() + def test_read_gzip_large(self): _, lines = write_data(10000) file_name = self._create_temp_file()
