Jack Whelpton created BEAM-11282:
------------------------------------
Summary: Cannot set compression level when writing compressed files
Key: BEAM-11282
URL: https://issues.apache.org/jira/browse/BEAM-11282
Project: Beam
Issue Type: Improvement
Components: sdk-py-core
Affects Versions: 2.25.0
Reporter: Jack Whelpton
CompressedFile._initialize_compressor hardcodes the compression level used when
writing:
self._compressor = zlib.compressobj(
zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, self._gzip_mask)
It would be good to be able to control this, as I have a large set of GZIP
compressed files that are creating output 10x larger then the input size when
writing the same data back.
I've tried various monkeypatching approaches: these seem to work with the local
runner, but failed when using DataflowRunner. For example:
class WriteData(beam.PTransform):
def __init__(self, dst):
import zlib
self._dst = dst
def _initialize_compressor(self):
self._compressor = zlib.compressobj(
zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, self._gzip_mask
)
CompressedFile._initialize_compressor = _initialize_compressor
def expand(self, p):
return p | WriteToText(
file_path_prefix=self._dst,
file_name_suffix=".tsv.gz",
compression_type="gzip",
)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)