This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c960676c78 [python] Introduce backports.zstd dependency to fix 
fastavro reader exception for ManifestListManager (#7834)
c960676c78 is described below

commit c960676c7881fc5b5e634c1df722b3b674894765
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed May 13 11:35:43 2026 +0800

    [python] Introduce backports.zstd dependency to fix fastavro reader 
exception for ManifestListManager (#7834)
    
    On Python versions below 3.14, fastavro uses `backports.zstd` for the
    zstandard Avro codec. Without that package, reading zstd-compressed
    manifest lists with `ManifestListManager.read` can fail with a confusing
    “missing codec / install backports.zstd” style error, especially in slim
    or Ray-like environments where optional deps are omitted. Therefore
    introduce `backports.zstd` (with a Python < 3.14 marker) to
    `paimon-python/dev/requirements.txt` so that local and CI-style dev
    installs include the codec backend needed for zstd manifest list I/O.
---
 paimon-python/dev/requirements.txt                 |   1 +
 .../manifest/manifest_list_zstd_read_subprocess.py |  60 ++++++
 .../tests/manifest/manifest_manager_test.py        | 217 +++++++++++++++++++++
 3 files changed, 278 insertions(+)

diff --git a/paimon-python/dev/requirements.txt 
b/paimon-python/dev/requirements.txt
index 192b2e9add..9cd250500b 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -34,5 +34,6 @@ pyroaring<=0.4.5; python_version == "3.7"
 pyroaring>=1.0.0; python_version >= "3.8"
 readerwriterlock>=1,<2
 zstandard>=0.19,<1
+backports.zstd>=1.0.0,<1.4.0; python_version >= "3.9" and python_version < 
"3.14"
 cramjam>=1.3.0,<3; python_version>="3.7"
 pyyaml>=5.4,<7
diff --git 
a/paimon-python/pypaimon/tests/manifest/manifest_list_zstd_read_subprocess.py 
b/paimon-python/pypaimon/tests/manifest/manifest_list_zstd_read_subprocess.py
new file mode 100644
index 0000000000..760ad39116
--- /dev/null
+++ 
b/paimon-python/pypaimon/tests/manifest/manifest_list_zstd_read_subprocess.py
@@ -0,0 +1,60 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Test helper: subprocess ``ManifestListManager.read``; prints entry count on 
stdout.
+
+Usage: ``python manifest_list_zstd_read_subprocess.py <warehouse> <table_id> 
<list_file_name>``
+(``list_file_name`` = basename under the table manifest dir). Imports live in 
``main()``."""
+
+import sys
+
+
+def main():
+    if len(sys.argv) != 4:
+        print(
+            'usage: manifest_list_zstd_read_subprocess.py '
+            '<warehouse_path> <table_id> <manifest_list_name>',
+            file=sys.stderr,
+        )
+        return 2
+    warehouse_path, catalog_table_id, manifest_list_name = (
+        sys.argv[1],
+        sys.argv[2],
+        sys.argv[3],
+    )
+    from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+    from pypaimon.common.identifier import Identifier
+    from pypaimon.common.options import Options
+    from pypaimon.common.options.config import CatalogOptions
+    from pypaimon.manifest.manifest_list_manager import ManifestListManager
+
+    catalog = FileSystemCatalog(Options({CatalogOptions.WAREHOUSE.key(): 
warehouse_path}))
+    table = catalog.get_table(Identifier.from_string(catalog_table_id))
+    manifest_list_manager = ManifestListManager(table)
+    metas = manifest_list_manager.read(manifest_list_name)
+    print(len(metas))
+    return 0
+
+
+if __name__ == '__main__':
+    try:
+        raise SystemExit(main())
+    except Exception:
+        import traceback
+        traceback.print_exc()
+        raise SystemExit(1)
diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py 
b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
index 60e92407d3..5752f04b88 100644
--- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
+++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
@@ -15,7 +15,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import os
+import shutil
+import subprocess
+import sys
 import tempfile
+import threading
 import unittest
 
 import pyarrow as pa
@@ -38,6 +43,141 @@ _EMPTY_ROW = GenericRow([], [])
 _EMPTY_STATS = SimpleStats(min_values=_EMPTY_ROW, max_values=_EMPTY_ROW, 
null_counts=[])
 
 
+def _paimon_python_root():
+    return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 
'..'))
+
+
+def _runner_can_write_zstandard_avro():
+    """fastavro uses ``backports.zstd`` (Py < 3.14) to *write* zstandard Avro 
blocks."""
+    try:
+        from io import BytesIO
+        import fastavro
+        buf = BytesIO()
+        fastavro.writer(buf, {'type': 'string'}, ['x'], codec='zstandard')
+        return len(buf.getvalue()) > 0
+    except Exception:
+        return False
+
+
+def _venv_python_executable(venv_dir):
+    for rel in (('bin', 'python'), ('bin', 'python3'), ('Scripts', 
'python.exe')):
+        path = os.path.join(venv_dir, *rel)
+        if os.path.isfile(path):
+            return path
+    raise FileNotFoundError('no interpreter under venv: ' + venv_dir)
+
+
+def _subprocess_env_for_pip():
+    """Drop proxy variables for venv pip calls (avoids SOCKS optional 
dependency errors)."""
+    env = os.environ.copy()
+    for key in list(env):
+        if 'PROXY' in key.upper():
+            env.pop(key, None)
+    env.setdefault('PIP_DISABLE_PIP_VERSION_CHECK', '1')
+    return env
+
+
+_MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK = threading.Lock()
+_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = None
+_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = None
+
+
+def _manifest_zstd_read_subprocess_venv_python():
+    """Disposable venv with editable pypaimon for 
``manifest_list_zstd_read_subprocess.py``.
+
+    Does not install ``backports.zstd`` so the first worker run can hit 
fastavro's missing zstd
+    codec path when reading zstandard-compressed manifest lists.
+    """
+    global _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR, 
_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
+    with _MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK:
+        if _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON is not None:
+            return _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
+        repo = _paimon_python_root()
+        venv_dir = tempfile.mkdtemp(prefix='paimon-zstd-read-subprocess-')
+        pip_install_env = _subprocess_env_for_pip()
+        uv_bin = shutil.which('uv')
+        try:
+            if uv_bin:
+                subprocess.check_call(
+                    [uv_bin, 'venv', venv_dir],
+                    stdout=subprocess.DEVNULL,
+                    stderr=subprocess.DEVNULL,
+                    env=pip_install_env,
+                )
+                isolated_venv_python = _venv_python_executable(venv_dir)
+                subprocess.check_call(
+                    [uv_bin, 'pip', 'install', '-q', '--python', 
isolated_venv_python, '-e', repo, 'requests'],
+                    env=pip_install_env,
+                )
+            else:
+                subprocess.check_call(
+                    [sys.executable, '-m', 'venv', venv_dir],
+                    stdout=subprocess.DEVNULL,
+                    stderr=subprocess.DEVNULL,
+                )
+                isolated_venv_python = _venv_python_executable(venv_dir)
+                subprocess.check_call(
+                    [isolated_venv_python, '-m', 'pip', 'install', '-q', '-e', 
repo, 'requests'],
+                    env=pip_install_env,
+                )
+        except Exception:
+            shutil.rmtree(venv_dir, ignore_errors=True)
+            raise
+        _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = venv_dir
+        _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = isolated_venv_python
+        return isolated_venv_python
+
+
+def _tear_down_manifest_zstd_read_subprocess_venv():
+    global _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR, 
_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
+    with _MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK:
+        if _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR:
+            shutil.rmtree(_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR, 
ignore_errors=True)
+        _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = None
+        _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = None
+
+
+def tearDownModule():
+    _tear_down_manifest_zstd_read_subprocess_venv()
+
+
+def _write_zstandard_manifest_list_file(table, list_name):
+    """Write a manifest list Avro object file using the zstandard codec (same 
as Java may emit)."""
+    from io import BytesIO
+    import fastavro
+    from pypaimon.manifest.manifest_list_manager import ManifestListManager
+    from pypaimon.manifest.schema.manifest_file_meta import (
+        MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
+    from pypaimon.table.row.generic_row import GenericRowSerializer
+
+    meta = ManifestFileMeta(
+        file_name='manifest.avro', file_size=1024,
+        num_added_files=1, num_deleted_files=0,
+        partition_stats=SimpleStats.empty_stats(), schema_id=0,
+    )
+    avro_record = {
+        '_VERSION': 2,
+        '_FILE_NAME': meta.file_name,
+        '_FILE_SIZE': meta.file_size,
+        '_NUM_ADDED_FILES': meta.num_added_files,
+        '_NUM_DELETED_FILES': meta.num_deleted_files,
+        '_PARTITION_STATS': {
+            '_MIN_VALUES': 
GenericRowSerializer.to_bytes(meta.partition_stats.min_values),
+            '_MAX_VALUES': 
GenericRowSerializer.to_bytes(meta.partition_stats.max_values),
+            '_NULL_COUNTS': meta.partition_stats.null_counts,
+        },
+        '_SCHEMA_ID': meta.schema_id,
+        '_MIN_ROW_ID': meta.min_row_id,
+        '_MAX_ROW_ID': meta.max_row_id,
+    }
+    buf = BytesIO()
+    fastavro.writer(buf, MANIFEST_FILE_META_SCHEMA, [avro_record], 
codec='zstandard')
+    mlm = ManifestListManager(table)
+    list_path = '{}/{}'.format(mlm.manifest_path, list_name)
+    with table.file_io.new_output_stream(list_path) as out:
+        out.write(buf.getvalue())
+
+
 class _ManifestManagerSetup(unittest.TestCase):
     """Shared setup for manifest manager tests.
 
@@ -191,6 +331,83 @@ class ManifestListManagerTest(_ManifestManagerSetup):
         self.assertEqual(len(result), 1)
         self.assertEqual(result[0].file_name, "manifest-base.avro")
 
+    @unittest.skipIf(
+        sys.version_info < (3, 9),
+        'PyPI backports.zstd only supports Python 3.9–3.13',
+    )
+    @unittest.skipIf(
+        sys.version_info >= (3, 14),
+        'fastavro uses stdlib compression.zstd on Python 3.14+, not 
backports.zstd',
+    )
+    def test_zstd_manifest_list_fastavro_requires_backports_zstd(self):
+        """Child venv runs ``manifest_list_zstd_read_subprocess`` (argv: 
warehouse, table id, list file name).
+
+        No ``backports.zstd`` in the venv → read fails; after ``pip install`` 
→ read succeeds.
+        """
+        if not _runner_can_write_zstandard_avro():
+            self.skipTest('runner cannot write zstandard Avro')
+
+        zstd_manifest_list_file_name = 
'zstd-manifest-list-backports-integration'
+        _write_zstandard_manifest_list_file(self.table, 
zstd_manifest_list_file_name)
+
+        manifest_list_manager = ManifestListManager(self.table)
+        
self.assertEqual(len(manifest_list_manager.read(zstd_manifest_list_file_name)), 
1)
+
+        zstd_manifest_list_abspath = os.path.join(
+            manifest_list_manager.manifest_path, zstd_manifest_list_file_name)
+        self.assertTrue(os.path.isfile(zstd_manifest_list_abspath), 
msg=zstd_manifest_list_abspath)
+
+        zstd_read_subprocess_script = os.path.join(
+            os.path.dirname(__file__), 'manifest_list_zstd_read_subprocess.py')
+        catalog_table_id = 'default.{}'.format(self._table_name)
+        isolated_venv_python = _manifest_zstd_read_subprocess_venv_python()
+        pip_install_env = _subprocess_env_for_pip()
+        subprocess.run(
+            [isolated_venv_python, '-m', 'pip', 'uninstall', '-y', 
'backports.zstd'],
+            stdout=subprocess.DEVNULL,
+            stderr=subprocess.DEVNULL,
+            env=pip_install_env,
+        )
+
+        read_without_zstd_backend = subprocess.run(
+            [
+                isolated_venv_python,
+                zstd_read_subprocess_script,
+                self.catalog.warehouse,
+                catalog_table_id,
+                zstd_manifest_list_file_name,
+            ],
+            capture_output=True,
+            text=True,
+        )
+        self.assertNotEqual(
+            read_without_zstd_backend.returncode, 0,
+            msg=read_without_zstd_backend.stdout + 
read_without_zstd_backend.stderr)
+        stderr_and_stdout = (
+            read_without_zstd_backend.stdout + 
read_without_zstd_backend.stderr)
+        self.assertIn('zstandard codec is supported but you need to install', 
stderr_and_stdout)
+        self.assertIn('backports.zstd', stderr_and_stdout)
+
+        subprocess.check_call(
+            [isolated_venv_python, '-m', 'pip', 'install', '-q', 
'backports.zstd'],
+            env=pip_install_env,
+        )
+        read_with_zstd_backend = subprocess.run(
+            [
+                isolated_venv_python,
+                zstd_read_subprocess_script,
+                self.catalog.warehouse,
+                catalog_table_id,
+                zstd_manifest_list_file_name,
+            ],
+            capture_output=True,
+            text=True,
+        )
+        self.assertEqual(
+            read_with_zstd_backend.returncode, 0,
+            msg=read_with_zstd_backend.stdout + read_with_zstd_backend.stderr)
+        self.assertEqual(read_with_zstd_backend.stdout.strip(), '1')
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to