This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6f7d2fbaeb4 (#25316) Enable LZMA compression in Python SDK I/O (#25317)
6f7d2fbaeb4 is described below
commit 6f7d2fbaeb4a71173df0d9553538ae79c23e6314
Author: William Ross Morrow <[email protected]>
AuthorDate: Mon Feb 13 06:31:01 2023 -0800
(#25316) Enable LZMA compression in Python SDK I/O (#25317)
* (#25316) Added naive first shot at enabling LZMA compression
* (#25316) Added a draft line to CHANGES.md
* (#25316) fix linter issues
* (#25316) update tests (draft)
* (#25316) import order in test file
---
CHANGES.md | 1 +
sdks/python/apache_beam/io/filesystem.py | 21 +++++++++++++++++++--
sdks/python/apache_beam/io/filesystem_test.py | 25 +++++++++++++++++++------
3 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d01c764b7ea..10310c6cbef 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -61,6 +61,7 @@
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* Added in JmsIO a retry policy for failed publications (Java)
([#24971](https://github.com/apache/beam/issues/24971)).
+* Support for `LZMA` compression/decompression of text files added to the
Python SDK ([#25316](https://github.com/apache/beam/issues/25316))
## New Features / Improvements
diff --git a/sdks/python/apache_beam/io/filesystem.py
b/sdks/python/apache_beam/io/filesystem.py
index fa1f67ac03f..142e04bc295 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -28,6 +28,7 @@ import abc
import bz2
import io
import logging
+import lzma
import os
import posixpath
import re
@@ -65,6 +66,10 @@ class CompressionTypes(object):
# .bz2 (implies BZIP2 as described below).
# .gz (implies GZIP as described below)
# .deflate (implies DEFLATE as described below)
+ # .zst (implies ZSTD as described below)
+ # .zst (implies ZSTD as described below)
+ # .xz (implies LZMA as described below)
+ # .lzma (implies LZMA as described below)
# Any non-recognized extension implies UNCOMPRESSED as described below.
AUTO = 'auto'
@@ -80,6 +85,9 @@ class CompressionTypes(object):
# GZIP compression (deflate with GZIP headers).
GZIP = 'gzip'
+ # LZMA compression
+ LZMA = 'lzma'
+
# Uncompressed (i.e., may be split).
UNCOMPRESSED = 'uncompressed'
@@ -92,6 +100,7 @@ class CompressionTypes(object):
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD,
+ CompressionTypes.LZMA,
CompressionTypes.UNCOMPRESSED
])
return compression_type in types
@@ -103,6 +112,7 @@ class CompressionTypes(object):
cls.DEFLATE: 'application/x-deflate',
cls.GZIP: 'application/x-gzip',
cls.ZSTD: 'application/zstd',
+ cls.LZMA: 'application/lzma'
}
return mime_types_by_compression_type.get(compression_type, default)
@@ -114,7 +124,9 @@ class CompressionTypes(object):
'.deflate': cls.DEFLATE,
'.gz': cls.GZIP,
'.zst': cls.ZSTD,
- '.zstd': cls.ZSTD
+ '.zstd': cls.ZSTD,
+ '.xz': cls.LZMA,
+ '.lzma': cls.LZMA
}
lowercased_path = file_path.lower()
for suffix, compression_type in compression_types_by_suffix.items():
@@ -184,6 +196,8 @@ class CompressedFile(object):
# https://github.com/indygreg/python-zstandard/issues/157
self._decompressor = zstandard.ZstdDecompressor(
max_window_size=2147483648).decompressobj()
+ elif self._compression_type == CompressionTypes.LZMA:
+ self._decompressor = lzma.LZMADecompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._decompressor = zlib.decompressobj(self._gzip_mask)
@@ -196,6 +210,8 @@ class CompressedFile(object):
zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED)
elif self._compression_type == CompressionTypes.ZSTD:
self._compressor = zstandard.ZstdCompressor().compressobj()
+ elif self._compression_type == CompressionTypes.LZMA:
+ self._compressor = lzma.LZMACompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._compressor = zlib.compressobj(
@@ -257,7 +273,8 @@ class CompressedFile(object):
if (self._compression_type == CompressionTypes.BZIP2 or
self._compression_type == CompressionTypes.DEFLATE or
self._compression_type == CompressionTypes.ZSTD or
- self._compression_type == CompressionTypes.GZIP):
+ self._compression_type == CompressionTypes.GZIP or
+ self._compression_type == CompressionTypes.LZMA):
pass
else:
# Deflate, Gzip and bzip2 formats do not require flushing
diff --git a/sdks/python/apache_beam/io/filesystem_test.py
b/sdks/python/apache_beam/io/filesystem_test.py
index 0b1827f9f7d..52f0e502a22 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -22,6 +22,7 @@
import bz2
import gzip
import logging
+import lzma
import ntpath
import os
import posixpath
@@ -316,6 +317,10 @@ atomized in instants hammered around the
compress_open = zstandard.open
with compress_open(file_name, 'wb') as f:
f.write(content)
+ elif compression_type == CompressionTypes.LZMA:
+ compress_open = lzma.open
+ with compress_open(file_name, 'wb') as f:
+ f.write(content)
else:
assert False, "Invalid compression type: %s" % compression_type
@@ -340,7 +345,8 @@ atomized in instants hammered around the
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
- CompressionTypes.ZSTD]:
+ CompressionTypes.ZSTD,
+ CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
@@ -375,7 +381,8 @@ atomized in instants hammered around the
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
- CompressionTypes.ZSTD]:
+ CompressionTypes.ZSTD,
+ CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
@@ -410,7 +417,8 @@ atomized in instants hammered around the
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
- CompressionTypes.ZSTD]:
+ CompressionTypes.ZSTD,
+ CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
@@ -428,7 +436,8 @@ atomized in instants hammered around the
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
- CompressionTypes.ZSTD]:
+ CompressionTypes.ZSTD,
+ CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
@@ -453,7 +462,8 @@ atomized in instants hammered around the
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
- CompressionTypes.ZSTD]:
+ CompressionTypes.ZSTD,
+ CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
@@ -520,6 +530,8 @@ atomized in instants hammered around the
compress_factory = gzip.open
elif compression_type == CompressionTypes.ZSTD:
compress_factory = zstandard.open
+ elif compression_type == CompressionTypes.LZMA:
+ compress_factory = lzma.open
else:
assert False, "Invalid compression type: %s" % compression_type
for line in lines:
@@ -547,7 +559,8 @@ atomized in instants hammered around the
test_lines = tuple(generate_random_line() for i in range(num_test_lines))
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.GZIP,
- CompressionTypes.ZSTD]:
+ CompressionTypes.ZSTD,
+ CompressionTypes.LZMA]:
file_name = create_test_file(compression_type, test_lines)
timer.start()
with open(file_name, 'rb') as f: