commit:     8bd06dafd1fb7da203d7037bfc6eef2fe11123b2
Author:     Sam James <sam <AT> gentoo <DOT> org>
AuthorDate: Sat Sep 24 23:26:52 2022 +0000
Commit:     Arthur Zamarin <arthurzam <AT> gentoo <DOT> org>
CommitDate: Mon Oct 10 16:48:52 2022 +0000
URL:        
https://gitweb.gentoo.org/proj/pkgcore/snakeoil.git/commit/?id=8bd06daf

compression: add parallel xz support

Closes: https://github.com/pkgcore/snakeoil/issues/83
Signed-off-by: Sam James <sam <AT> gentoo.org>
Signed-off-by: Arthur Zamarin <arthurzam <AT> gentoo.org>

 src/snakeoil/compression/__init__.py | 17 ++++----
 src/snakeoil/compression/_xz.py      | 74 +++++++++++++++++++++++++++++++
 tests/compression/__init__.py        | 70 +++++++++++++++++++++++++++++
 tests/compression/test_bzip2.py      | 85 ++++++------------------------------
 tests/compression/test_init.py       |  4 +-
 tests/compression/test_xz.py         | 56 ++++++++++++++++++++++++
 6 files changed, 225 insertions(+), 81 deletions(-)

diff --git a/src/snakeoil/compression/__init__.py 
b/src/snakeoil/compression/__init__.py
index 4b4a437c..580a70a2 100644
--- a/src/snakeoil/compression/__init__.py
+++ b/src/snakeoil/compression/__init__.py
@@ -1,3 +1,4 @@
+import multiprocessing
 import shlex
 from functools import cached_property
 from importlib import import_module
@@ -33,7 +34,7 @@ class _transform_source:
         return self.module.decompress_handle(handle, parallelize=parallelize)
 
 
-_transforms = {name: _transform_source(name) for name in ('bzip2',)}
+_transforms = {name: _transform_source(name) for name in ('bzip2', 'xz')}
 
 
 def compress_data(compressor_type, data, level=9, **kwds):
@@ -154,13 +155,13 @@ class _Tar(_Archive, ArComp):
         if self.compress_binary is not None:
             for b in self.compress_binary:
                 try:
-                    process.find_binary(b)
-                    cmd += f' --use-compress-program={b}'
+                    process.find_binary(b[0])
+                    cmd += f' --use-compress-program="{" ".join(b)}"'
                     break
                 except process.CommandNotFound:
                     pass
             else:
-                choices = ', '.join(self.compress_binary)
+                choices = ', '.join(next(zip(*self.compress_binary)))
                 raise ArCompError(
                     'no compression binary found from the '
                     f'following choices: {choices}')
@@ -170,25 +171,25 @@ class _Tar(_Archive, ArComp):
 class _TarGZ(_Tar):
 
     exts = frozenset(['.tar.gz', '.tgz', '.tar.Z', '.tar.z'])
-    compress_binary = ('pigz', 'gzip')
+    compress_binary = (('pigz',), ('gzip',))
 
 
 class _TarBZ2(_Tar):
 
     exts = frozenset(['.tar.bz2', '.tbz2', '.tbz'])
-    compress_binary = ('lbzip2', 'pbzip2', 'bzip2')
+    compress_binary = (('lbzip2',), ('pbzip2',), ('bzip2',))
 
 
 class _TarLZMA(_Tar):
 
     exts = frozenset(['.tar.lzma'])
-    compress_binary = ('lzma',)
+    compress_binary = (('lzma',))
 
 
 class _TarXZ(_Tar):
 
     exts = frozenset(['.tar.xz', '.txz'])
-    compress_binary = ('pixz', 'xz')
+    compress_binary = (('pixz',), ('xz', f'-T{multiprocessing.cpu_count()}'))
 
 
 class _Zip(_Archive, ArComp):

diff --git a/src/snakeoil/compression/_xz.py b/src/snakeoil/compression/_xz.py
new file mode 100644
index 00000000..47077379
--- /dev/null
+++ b/src/snakeoil/compression/_xz.py
@@ -0,0 +1,74 @@
+"""
+xz decompression/compression
+
+Where possible, this module defers to cpython's lzma module - if it's not 
available,
+it defers to executing xz with tempfile arguments to do decompression
+and compression.
+
+Use this module unless it's absolutely critical that lzma module be used.
+"""
+
+__all__ = ("compress_data", "decompress_data")
+
+import multiprocessing
+from functools import partial
+
+from .. import process
+from ..compression import _util
+
+# Unused import
+# pylint: disable=W0611
+
+# if xz can't be found, throw an error.
+xz_path = process.find_binary("xz")
+xz_compress_args = (f'-T{multiprocessing.cpu_count()}',)
+xz_decompress_args = xz_compress_args
+parallelizable = True
+
+try:
+    from lzma import LZMAFile
+    from lzma import compress as _compress_data
+    from lzma import decompress as _decompress_data
+    native = True
+except ImportError:
+
+    # We need this because if we are not native then TarFile.open will fail
+    # (and some code needs to be able to check that).
+    native = False
+
+    _compress_data = partial(_util.compress_data, xz_path)
+    _decompress_data = partial(_util.decompress_data, xz_path)
+
+_compress_handle = partial(_util.compress_handle, xz_path)
+_decompress_handle = partial(_util.decompress_handle, xz_path)
+
+
+def compress_data(data, level=9, parallelize=False):
+    if parallelize and parallelizable:
+        return _util.compress_data(xz_path, data, compresslevel=level,
+                                   extra_args=xz_compress_args)
+    if native:
+        return _compress_data(data, preset=level)
+    return _compress_data(data, compresslevel=level)
+
+def decompress_data(data, parallelize=False):
+    if parallelize and parallelizable:
+        return _util.decompress_data(xz_path, data,
+                                     extra_args=xz_decompress_args)
+    return _decompress_data(data)
+
+def compress_handle(handle, level=9, parallelize=False):
+    if parallelize and parallelizable:
+        return _util.compress_handle(xz_path, handle, compresslevel=level,
+                                     extra_args=xz_compress_args)
+    elif native and isinstance(handle, str):
+        return LZMAFile(handle, mode='w', preset=level)
+    return _compress_handle(handle, compresslevel=level)
+
+def decompress_handle(handle, parallelize=False):
+    if parallelize and parallelizable:
+        return _util.decompress_handle(xz_path, handle,
+                                       extra_args=xz_decompress_args)
+    elif (native and isinstance(handle, str)):
+        return LZMAFile(handle, mode='r')
+    return _decompress_handle(handle)

diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py
index 178d4b64..3b70dcba 100644
--- a/tests/compression/__init__.py
+++ b/tests/compression/__init__.py
@@ -1,5 +1,7 @@
 from unittest.mock import patch
 
+import pytest
+from snakeoil import compression
 from snakeoil.process import CommandNotFound, find_binary
 
 def hide_binary(*binaries: str):
@@ -9,3 +11,71 @@ def hide_binary(*binaries: str):
         return find_binary(name)
 
     return patch('snakeoil.process.find_binary', side_effect=mock_find_binary)
+
+
+class Base:
+
+    module: str = ''
+    decompressed_test_data: bytes = b''
+    compressed_test_data: bytes = b''
+
+    def decompress(self, data: bytes) -> bytes:
+        raise NotImplementedError(self, 'decompress')
+
+    @pytest.mark.parametrize('parallelize', (True, False))
+    @pytest.mark.parametrize('level', (1, 9))
+    def test_compress_data(self, level, parallelize):
+        compressed = compression.compress_data(self.module, 
self.decompressed_test_data, level=level, parallelize=parallelize)
+        assert compressed
+        assert self.decompress(compressed) == self.decompressed_test_data
+
+    @pytest.mark.parametrize('parallelize', (True, False))
+    def test_decompress_data(self, parallelize):
+        assert self.decompressed_test_data == 
compression.decompress_data(self.module, self.compressed_test_data, 
parallelize=parallelize)
+
+    @pytest.mark.parametrize('parallelize', (True, False))
+    @pytest.mark.parametrize('level', (1, 9))
+    def test_compress_handle(self, tmp_path, level, parallelize):
+        path = tmp_path / f'test.{self.module}'
+
+        stream = compression.compress_handle(self.module, str(path), 
level=level, parallelize=parallelize)
+        stream.write(self.decompressed_test_data)
+        stream.close()
+        assert self.decompress(path.read_bytes()) == 
self.decompressed_test_data
+
+        with path.open("wb") as file:
+            stream = compression.compress_handle(self.module, file, 
level=level, parallelize=parallelize)
+            stream.write(self.decompressed_test_data)
+            stream.close()
+            assert self.decompress(path.read_bytes()) == 
self.decompressed_test_data
+
+        with path.open("wb") as file:
+            stream = compression.compress_handle(self.module, file.fileno(), 
level=level, parallelize=parallelize)
+            stream.write(self.decompressed_test_data)
+            stream.close()
+            assert self.decompress(path.read_bytes()) == 
self.decompressed_test_data
+
+        with pytest.raises(TypeError):
+            compression.compress_handle(self.module, b'', level=level, 
parallelize=parallelize)
+
+    @pytest.mark.parametrize('parallelize', (True, False))
+    def test_decompress_handle(self, tmp_path, parallelize):
+        path = tmp_path / f'test.{self.module}'
+        path.write_bytes(self.compressed_test_data)
+
+        stream = compression.decompress_handle(self.module, str(path), 
parallelize=parallelize)
+        assert stream.read() == self.decompressed_test_data
+        stream.close()
+
+        with path.open("rb") as file:
+            stream = compression.decompress_handle(self.module, file, 
parallelize=parallelize)
+            assert stream.read() == self.decompressed_test_data
+            stream.close()
+
+        with path.open("rb") as file:
+            stream = compression.decompress_handle(self.module, file.fileno(), 
parallelize=parallelize)
+            assert stream.read() == self.decompressed_test_data
+            stream.close()
+
+        with pytest.raises(TypeError):
+            compression.decompress_handle(self.module, b'', 
parallelize=parallelize)

diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py
index d11d61fa..f3093d09 100644
--- a/tests/compression/test_bzip2.py
+++ b/tests/compression/test_bzip2.py
@@ -1,13 +1,12 @@
 import importlib
 from bz2 import decompress
-from pathlib import Path
 
 import pytest
 from snakeoil.compression import _bzip2
 from snakeoil.process import CommandNotFound, find_binary
 from snakeoil.test import hide_imports
 
-from . import hide_binary
+from . import Base, hide_binary
 
 
 def test_no_native():
@@ -27,76 +26,20 @@ def test_missing_lbzip2_binary():
         importlib.reload(_bzip2)
         assert not _bzip2.parallelizable
 
+class Bzip2Base(Base):
 
-decompressed_test_data = b'Some text here\n'
-compressed_test_data = (
-    b'BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02'
-    b'B\x94@ \x00"\r\x03\xd4\x0c \t!\x1b\xb7\x80u/\x17rE8P\x90\x1bM\x00\x02'
-)
+    module = 'bzip2'
+    decompressed_test_data = b'Some text here\n'
+    compressed_test_data = (
+        b'BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02'
+        b'B\x94@ \x00"\r\x03\xd4\x0c 
\t!\x1b\xb7\x80u/\x17rE8P\x90\x1bM\x00\x02'
+    )
 
+    def decompress(self, data: bytes) -> bytes:
+        return decompress(data)
 
-class Base:
 
-    @pytest.mark.parametrize('parallelize', (True, False))
-    @pytest.mark.parametrize('level', (1, 9))
-    def test_compress_data(self, level, parallelize):
-        compressed = _bzip2.compress_data(decompressed_test_data, level=level, 
parallelize=parallelize)
-        assert compressed
-        assert decompress(compressed) == decompressed_test_data
-
-    @pytest.mark.parametrize('parallelize', (True, False))
-    def test_decompress_data(self, parallelize):
-        assert decompressed_test_data == 
_bzip2.decompress_data(compressed_test_data, parallelize=parallelize)
-
-    @pytest.mark.parametrize('parallelize', (True, False))
-    @pytest.mark.parametrize('level', (1, 9))
-    def test_compress_handle(self, tmp_path, level, parallelize):
-        path = tmp_path / 'test.bz2'
-
-        stream = _bzip2.compress_handle(str(path), level=level, 
parallelize=parallelize)
-        stream.write(decompressed_test_data)
-        stream.close()
-        assert decompress(path.read_bytes()) == decompressed_test_data
-
-        with path.open("wb") as file:
-            stream = _bzip2.compress_handle(file, level=level, 
parallelize=parallelize)
-            stream.write(decompressed_test_data)
-            stream.close()
-            assert decompress(path.read_bytes()) == decompressed_test_data
-
-        with path.open("wb") as file:
-            stream = _bzip2.compress_handle(file.fileno(), level=level, 
parallelize=parallelize)
-            stream.write(decompressed_test_data)
-            stream.close()
-            assert decompress(path.read_bytes()) == decompressed_test_data
-
-        with pytest.raises(TypeError):
-            _bzip2.compress_handle(b'', level=level, parallelize=parallelize)
-
-    @pytest.mark.parametrize('parallelize', (True, False))
-    def test_decompress_handle(self, tmp_path, parallelize):
-        path: Path = tmp_path / 'test.bz2'
-        path.write_bytes(compressed_test_data)
-
-        stream = _bzip2.decompress_handle(str(path), parallelize=parallelize)
-        assert stream.read() == decompressed_test_data
-        stream.close()
-
-        with path.open("rb") as file:
-            stream = _bzip2.decompress_handle(file, parallelize=parallelize)
-            assert stream.read() == decompressed_test_data
-            stream.close()
-
-        with path.open("rb") as file:
-            stream = _bzip2.decompress_handle(file.fileno(), 
parallelize=parallelize)
-            assert stream.read() == decompressed_test_data
-            stream.close()
-
-        with pytest.raises(TypeError):
-            _bzip2.decompress_handle(b'', parallelize=parallelize)
-
-
-class TestStdlib(Base):
+class TestStdlib(Bzip2Base):
 
     @pytest.fixture(autouse=True, scope='class')
     def _setup(self):
@@ -109,7 +52,7 @@ class TestStdlib(Base):
             yield
 
 
-class TestBzip2(Base):
+class TestBzip2(Bzip2Base):
 
     @pytest.fixture(autouse=True, scope='class')
     def _setup(self):
@@ -118,7 +61,7 @@ class TestBzip2(Base):
             yield
 
 
-class TestLbzip2(Base):
+class TestLbzip2(Bzip2Base):
 
     @pytest.fixture(autouse=True, scope='class')
     def _setup(self):
@@ -130,4 +73,4 @@ class TestLbzip2(Base):
 
     def test_bad_level(self):
         with pytest.raises(ValueError, match='unknown option "-0"'):
-            _bzip2.compress_data(decompressed_test_data, level=90, 
parallelize=True)
+            _bzip2.compress_data(self.decompressed_test_data, level=90, 
parallelize=True)

diff --git a/tests/compression/test_init.py b/tests/compression/test_init.py
index 0726571b..f3a40270 100644
--- a/tests/compression/test_init.py
+++ b/tests/compression/test_init.py
@@ -70,14 +70,14 @@ class TestArComp:
         assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
 
     def test_fallback_tbz2(self, tmp_path, tbz2_file):
-        with hide_binary(*_TarBZ2.compress_binary[:-1]):
+        with hide_binary(*next(zip(*_TarBZ2.compress_binary[:-1]))):
             with chdir(tmp_path):
                 ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
             assert (tmp_path / 'file1').read_text() == 'Hello world'
             assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
 
     def test_no_fallback_tbz2(self, tmp_path, tbz2_file):
-        with hide_binary(*_TarBZ2.compress_binary), chdir(tmp_path):
+        with hide_binary(*next(zip(*_TarBZ2.compress_binary))), 
chdir(tmp_path):
             with pytest.raises(ArCompError, match='no compression binary'):
                 ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
 

diff --git a/tests/compression/test_xz.py b/tests/compression/test_xz.py
new file mode 100644
index 00000000..f8417b30
--- /dev/null
+++ b/tests/compression/test_xz.py
@@ -0,0 +1,56 @@
+import importlib
+from lzma import decompress
+
+import pytest
+from snakeoil.compression import _xz
+from snakeoil.process import CommandNotFound, find_binary
+from snakeoil.test import hide_imports
+
+from . import Base, hide_binary
+
+
+def test_no_native():
+    with hide_imports('lzma'):
+        importlib.reload(_xz)
+        assert not _xz.native
+
+
+def test_missing_xz_binary():
+    with hide_binary('xz'):
+        with pytest.raises(CommandNotFound, match='xz'):
+            importlib.reload(_xz)
+
+
+class XzBase(Base):
+
+    module = 'xz'
+    decompressed_test_data = b'Some text here\n' * 2
+    compressed_test_data = (
+        
b'\xfd7zXZ\x00\x00\x04\xe6\xd6\xb4F\x04\xc0\x1e\x1e!\x01\x16\x00\x00\x00'
+        
b'\x00\x00\x00\x00\x00\x00j\xf6\x947\xe0\x00\x1d\x00\x16]\x00)\x9b\xc9\xa6g'
+        
b'Bw\x8c\xb3\x9eA\x9a\xbeT\xc9\xfa\xe3\x19\x8f(\x00\x00\x00\x00\x00\x96N'
+        b'\xa8\x8ed\xa2WH\x00\x01:\x1e1V 
\xff\x1f\xb6\xf3}\x01\x00\x00\x00\x00\x04YZ'
+    )
+
+    def decompress(self, data: bytes) -> bytes:
+        return decompress(data)
+
+
+class TestStdlib(XzBase):
+
+    @pytest.fixture(autouse=True, scope='class')
+    def _setup(self):
+        try:
+            find_binary('xz')
+        except CommandNotFound:
+            pytest.skip('xz binary not found')
+        importlib.reload(_xz)
+
+
+class TestXz(XzBase):
+
+    @pytest.fixture(autouse=True, scope='class')
+    def _setup(self):
+        with hide_imports('lzma'):
+            importlib.reload(_xz)
+            yield

Reply via email to