This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c960676c78 [python] Introduce backports.zstd dependency to fix
fastavro reader exception for ManifestListManager (#7834)
c960676c78 is described below
commit c960676c7881fc5b5e634c1df722b3b674894765
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed May 13 11:35:43 2026 +0800
[python] Introduce backports.zstd dependency to fix fastavro reader
exception for ManifestListManager (#7834)
On Python versions below 3.14, fastavro uses `backports.zstd` for the
zstandard Avro codec. Without that package, reading zstd-compressed
manifest lists with `ManifestListManager.read` can fail with a confusing
“missing codec / install backports.zstd” style error, especially in slim
or Ray-like environments where optional deps are omitted. Therefore
introduce `backports.zstd` (with a Python < 3.14 marker) to
`paimon-python/dev/requirements.txt` so that local and CI-style dev
installs include the codec backend needed for zstd manifest list I/O.
---
paimon-python/dev/requirements.txt | 1 +
.../manifest/manifest_list_zstd_read_subprocess.py | 60 ++++++
.../tests/manifest/manifest_manager_test.py | 217 +++++++++++++++++++++
3 files changed, 278 insertions(+)
diff --git a/paimon-python/dev/requirements.txt
b/paimon-python/dev/requirements.txt
index 192b2e9add..9cd250500b 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -34,5 +34,6 @@ pyroaring<=0.4.5; python_version == "3.7"
pyroaring>=1.0.0; python_version >= "3.8"
readerwriterlock>=1,<2
zstandard>=0.19,<1
+backports.zstd>=1.0.0,<1.4.0; python_version >= "3.9" and python_version <
"3.14"
cramjam>=1.3.0,<3; python_version>="3.7"
pyyaml>=5.4,<7
diff --git
a/paimon-python/pypaimon/tests/manifest/manifest_list_zstd_read_subprocess.py
b/paimon-python/pypaimon/tests/manifest/manifest_list_zstd_read_subprocess.py
new file mode 100644
index 0000000000..760ad39116
--- /dev/null
+++
b/paimon-python/pypaimon/tests/manifest/manifest_list_zstd_read_subprocess.py
@@ -0,0 +1,60 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Test helper: subprocess ``ManifestListManager.read``; prints entry count on
stdout.
+
+Usage: ``python manifest_list_zstd_read_subprocess.py <warehouse> <table_id>
<list_file_name>``
+(``list_file_name`` = basename under the table manifest dir). Imports live in
``main()``."""
+
+import sys
+
+
+def main():
+ if len(sys.argv) != 4:
+ print(
+ 'usage: manifest_list_zstd_read_subprocess.py '
+ '<warehouse_path> <table_id> <manifest_list_name>',
+ file=sys.stderr,
+ )
+ return 2
+ warehouse_path, catalog_table_id, manifest_list_name = (
+ sys.argv[1],
+ sys.argv[2],
+ sys.argv[3],
+ )
+ from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+ from pypaimon.common.identifier import Identifier
+ from pypaimon.common.options import Options
+ from pypaimon.common.options.config import CatalogOptions
+ from pypaimon.manifest.manifest_list_manager import ManifestListManager
+
+ catalog = FileSystemCatalog(Options({CatalogOptions.WAREHOUSE.key():
warehouse_path}))
+ table = catalog.get_table(Identifier.from_string(catalog_table_id))
+ manifest_list_manager = ManifestListManager(table)
+ metas = manifest_list_manager.read(manifest_list_name)
+ print(len(metas))
+ return 0
+
+
+if __name__ == '__main__':
+ try:
+ raise SystemExit(main())
+ except Exception:
+ import traceback
+ traceback.print_exc()
+ raise SystemExit(1)
diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
index 60e92407d3..5752f04b88 100644
--- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
+++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
@@ -15,7 +15,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import os
+import shutil
+import subprocess
+import sys
import tempfile
+import threading
import unittest
import pyarrow as pa
@@ -38,6 +43,141 @@ _EMPTY_ROW = GenericRow([], [])
_EMPTY_STATS = SimpleStats(min_values=_EMPTY_ROW, max_values=_EMPTY_ROW,
null_counts=[])
+def _paimon_python_root():
+ return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..',
'..'))
+
+
+def _runner_can_write_zstandard_avro():
+ """fastavro uses ``backports.zstd`` (Py < 3.14) to *write* zstandard Avro
blocks."""
+ try:
+ from io import BytesIO
+ import fastavro
+ buf = BytesIO()
+ fastavro.writer(buf, {'type': 'string'}, ['x'], codec='zstandard')
+ return len(buf.getvalue()) > 0
+ except Exception:
+ return False
+
+
+def _venv_python_executable(venv_dir):
+ for rel in (('bin', 'python'), ('bin', 'python3'), ('Scripts',
'python.exe')):
+ path = os.path.join(venv_dir, *rel)
+ if os.path.isfile(path):
+ return path
+ raise FileNotFoundError('no interpreter under venv: ' + venv_dir)
+
+
+def _subprocess_env_for_pip():
+ """Drop proxy variables for venv pip calls (avoids SOCKS optional
dependency errors)."""
+ env = os.environ.copy()
+ for key in list(env):
+ if 'PROXY' in key.upper():
+ env.pop(key, None)
+ env.setdefault('PIP_DISABLE_PIP_VERSION_CHECK', '1')
+ return env
+
+
+_MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK = threading.Lock()
+_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = None
+_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = None
+
+
+def _manifest_zstd_read_subprocess_venv_python():
+ """Disposable venv with editable pypaimon for
``manifest_list_zstd_read_subprocess.py``.
+
+ Does not install ``backports.zstd`` so the first worker run can hit
fastavro's missing zstd
+ codec path when reading zstandard-compressed manifest lists.
+ """
+ global _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR,
_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
+ with _MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK:
+ if _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON is not None:
+ return _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
+ repo = _paimon_python_root()
+ venv_dir = tempfile.mkdtemp(prefix='paimon-zstd-read-subprocess-')
+ pip_install_env = _subprocess_env_for_pip()
+ uv_bin = shutil.which('uv')
+ try:
+ if uv_bin:
+ subprocess.check_call(
+ [uv_bin, 'venv', venv_dir],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=pip_install_env,
+ )
+ isolated_venv_python = _venv_python_executable(venv_dir)
+ subprocess.check_call(
+ [uv_bin, 'pip', 'install', '-q', '--python',
isolated_venv_python, '-e', repo, 'requests'],
+ env=pip_install_env,
+ )
+ else:
+ subprocess.check_call(
+ [sys.executable, '-m', 'venv', venv_dir],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ isolated_venv_python = _venv_python_executable(venv_dir)
+ subprocess.check_call(
+ [isolated_venv_python, '-m', 'pip', 'install', '-q', '-e',
repo, 'requests'],
+ env=pip_install_env,
+ )
+ except Exception:
+ shutil.rmtree(venv_dir, ignore_errors=True)
+ raise
+ _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = venv_dir
+ _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = isolated_venv_python
+ return isolated_venv_python
+
+
+def _tear_down_manifest_zstd_read_subprocess_venv():
+ global _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR,
_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
+ with _MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK:
+ if _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR:
+ shutil.rmtree(_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR,
ignore_errors=True)
+ _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = None
+ _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = None
+
+
+def tearDownModule():
+ _tear_down_manifest_zstd_read_subprocess_venv()
+
+
+def _write_zstandard_manifest_list_file(table, list_name):
+ """Write a manifest list Avro object file using the zstandard codec (same
as Java may emit)."""
+ from io import BytesIO
+ import fastavro
+ from pypaimon.manifest.manifest_list_manager import ManifestListManager
+ from pypaimon.manifest.schema.manifest_file_meta import (
+ MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
+ from pypaimon.table.row.generic_row import GenericRowSerializer
+
+ meta = ManifestFileMeta(
+ file_name='manifest.avro', file_size=1024,
+ num_added_files=1, num_deleted_files=0,
+ partition_stats=SimpleStats.empty_stats(), schema_id=0,
+ )
+ avro_record = {
+ '_VERSION': 2,
+ '_FILE_NAME': meta.file_name,
+ '_FILE_SIZE': meta.file_size,
+ '_NUM_ADDED_FILES': meta.num_added_files,
+ '_NUM_DELETED_FILES': meta.num_deleted_files,
+ '_PARTITION_STATS': {
+ '_MIN_VALUES':
GenericRowSerializer.to_bytes(meta.partition_stats.min_values),
+ '_MAX_VALUES':
GenericRowSerializer.to_bytes(meta.partition_stats.max_values),
+ '_NULL_COUNTS': meta.partition_stats.null_counts,
+ },
+ '_SCHEMA_ID': meta.schema_id,
+ '_MIN_ROW_ID': meta.min_row_id,
+ '_MAX_ROW_ID': meta.max_row_id,
+ }
+ buf = BytesIO()
+ fastavro.writer(buf, MANIFEST_FILE_META_SCHEMA, [avro_record],
codec='zstandard')
+ mlm = ManifestListManager(table)
+ list_path = '{}/{}'.format(mlm.manifest_path, list_name)
+ with table.file_io.new_output_stream(list_path) as out:
+ out.write(buf.getvalue())
+
+
class _ManifestManagerSetup(unittest.TestCase):
"""Shared setup for manifest manager tests.
@@ -191,6 +331,83 @@ class ManifestListManagerTest(_ManifestManagerSetup):
self.assertEqual(len(result), 1)
self.assertEqual(result[0].file_name, "manifest-base.avro")
+ @unittest.skipIf(
+ sys.version_info < (3, 9),
+ 'PyPI backports.zstd only supports Python 3.9–3.13',
+ )
+ @unittest.skipIf(
+ sys.version_info >= (3, 14),
+ 'fastavro uses stdlib compression.zstd on Python 3.14+, not
backports.zstd',
+ )
+ def test_zstd_manifest_list_fastavro_requires_backports_zstd(self):
+ """Child venv runs ``manifest_list_zstd_read_subprocess`` (argv:
warehouse, table id, list file name).
+
+ No ``backports.zstd`` in the venv → read fails; after ``pip install``
→ read succeeds.
+ """
+ if not _runner_can_write_zstandard_avro():
+ self.skipTest('runner cannot write zstandard Avro')
+
+ zstd_manifest_list_file_name =
'zstd-manifest-list-backports-integration'
+ _write_zstandard_manifest_list_file(self.table,
zstd_manifest_list_file_name)
+
+ manifest_list_manager = ManifestListManager(self.table)
+
self.assertEqual(len(manifest_list_manager.read(zstd_manifest_list_file_name)),
1)
+
+ zstd_manifest_list_abspath = os.path.join(
+ manifest_list_manager.manifest_path, zstd_manifest_list_file_name)
+ self.assertTrue(os.path.isfile(zstd_manifest_list_abspath),
msg=zstd_manifest_list_abspath)
+
+ zstd_read_subprocess_script = os.path.join(
+ os.path.dirname(__file__), 'manifest_list_zstd_read_subprocess.py')
+ catalog_table_id = 'default.{}'.format(self._table_name)
+ isolated_venv_python = _manifest_zstd_read_subprocess_venv_python()
+ pip_install_env = _subprocess_env_for_pip()
+ subprocess.run(
+ [isolated_venv_python, '-m', 'pip', 'uninstall', '-y',
'backports.zstd'],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=pip_install_env,
+ )
+
+ read_without_zstd_backend = subprocess.run(
+ [
+ isolated_venv_python,
+ zstd_read_subprocess_script,
+ self.catalog.warehouse,
+ catalog_table_id,
+ zstd_manifest_list_file_name,
+ ],
+ capture_output=True,
+ text=True,
+ )
+ self.assertNotEqual(
+ read_without_zstd_backend.returncode, 0,
+ msg=read_without_zstd_backend.stdout +
read_without_zstd_backend.stderr)
+ stderr_and_stdout = (
+ read_without_zstd_backend.stdout +
read_without_zstd_backend.stderr)
+ self.assertIn('zstandard codec is supported but you need to install',
stderr_and_stdout)
+ self.assertIn('backports.zstd', stderr_and_stdout)
+
+ subprocess.check_call(
+ [isolated_venv_python, '-m', 'pip', 'install', '-q',
'backports.zstd'],
+ env=pip_install_env,
+ )
+ read_with_zstd_backend = subprocess.run(
+ [
+ isolated_venv_python,
+ zstd_read_subprocess_script,
+ self.catalog.warehouse,
+ catalog_table_id,
+ zstd_manifest_list_file_name,
+ ],
+ capture_output=True,
+ text=True,
+ )
+ self.assertEqual(
+ read_with_zstd_backend.returncode, 0,
+ msg=read_with_zstd_backend.stdout + read_with_zstd_backend.stderr)
+ self.assertEqual(read_with_zstd_backend.stdout.strip(), '1')
+
if __name__ == '__main__':
unittest.main()