Repository: beam Updated Branches: refs/heads/master b3c36256e -> 336b7f1cf
[BEAM-2497] Fix the reading of concat gzip files Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8dcda6e4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8dcda6e4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8dcda6e4 Branch: refs/heads/master Commit: 8dcda6e40355af13f4d92fcd44aae4539a225a4a Parents: b3c3625 Author: Sourabh Bajaj <[email protected]> Authored: Thu Jun 22 17:08:20 2017 -0700 Committer: Chamikara Jayalath <[email protected]> Committed: Thu Jun 22 22:12:56 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets_test.py | 16 ++++++++++++++++ sdks/python/apache_beam/io/filesystem.py | 8 ++++++++ 2 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8dcda6e4/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 9183d0d..31f71b3 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -589,6 +589,22 @@ class SnippetsTest(unittest.TestCase): snippets.model_textio_compressed( {'read': gzip_file_name}, ['aa', 'bb', 'cc']) + def test_model_textio_gzip_concatenated(self): + temp_path_1 = self.create_temp_file('a\nb\nc\n') + temp_path_2 = self.create_temp_file('p\nq\nr\n') + temp_path_3 = self.create_temp_file('x\ny\nz') + gzip_file_name = temp_path_1 + '.gz' + with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst: + dst.writelines(src) + with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst: + dst.writelines(src) + with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst: + dst.writelines(src) + # Add the temporary gzip file to be cleaned up as well. + self.temp_files.append(gzip_file_name) + snippets.model_textio_compressed( + {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']) + @unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed') def test_model_datastoreio(self): # We cannot test datastoreio functionality in unit tests therefore we limit http://git-wip-us.apache.org/repos/asf/beam/blob/8dcda6e4/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index f553026..1f65d0a 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -201,6 +201,14 @@ class CompressedFile(object): assert False, 'Possible file corruption.' except EOFError: pass # All is as expected! + elif self._compression_type == CompressionTypes.GZIP: + # If Gzip file check if there is unused data generated by gzip concat + if self._decompressor.unused_data != '': + buf = self._decompressor.unused_data + self._decompressor = zlib.decompressobj(self._gzip_mask) + decompressed = self._decompressor.decompress(buf) + self._read_buffer.write(decompressed) + continue else: self._read_buffer.write(self._decompressor.flush())
