This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 326884cb fix manifest cache (#2951)
326884cb is described below
commit 326884cb617a4b923101192b13f7c12ade9f757e
Author: Kevin Liu <[email protected]>
AuthorDate: Wed Jan 28 20:57:14 2026 -0500
fix manifest cache (#2951)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
# Rationale for this change
Fix part of #2325
Context:
https://github.com/apache/iceberg-python/issues/2325#issuecomment-3193881084
Cache Manifest File object instead of Manifest List (tuple of Manifest
Files).
This PR fix the O(N²) cache inefficiency, into the expected O(N) linear
growth pattern.
## Are these changes tested?
Yes, with benchmark test (`tests/benchmark/test_memory_benchmark.py`)
Result running from main branch:
https://gist.github.com/kevinjqliu/970f4b51a12aaa0318a2671173430736
Result running from this branch:
https://gist.github.com/kevinjqliu/24990d18d2cea2fa468597c16bfa27fd
### Benchmark Comparison: main vs kevinjqliu/fix-manifest-cache
| Test | main | fix branch |
|------|:----:|:----------:|
| `test_manifest_cache_memory_growth` | ❌ FAILED | ✅ PASSED |
| `test_memory_after_gc_with_cache_cleared` | ✅ PASSED | ✅ PASSED |
| `test_manifest_cache_deduplication_efficiency` | ✅ PASSED | ✅ PASSED |
#### Memory Growth Benchmark (50 append operations)
| Metric | main | fix branch | Improvement |
|--------|-----:|----------:|------------:|
| Initial memory | 3,233.4 KB | 3,210.7 KB | -0.7% |
| Final memory | 4,280.6 KB | 3,558.9 KB | **-16.9%** |
| Total growth | 1,047.2 KB | 348.1 KB | **-66.8%** |
| Growth per iteration | 26,809 bytes | 8,913 bytes | **-66.8%** |
#### Memory at Each Iteration
| Iteration | main | fix branch | Δ |
|----------:|-----:|-----------:|--:|
| 10 | 3,233.4 KB | 3,210.7 KB | -22.7 KB |
| 20 | 3,471.0 KB | 3,371.4 KB | -99.6 KB |
| 30 | 3,719.3 KB | 3,467.1 KB | -252.2 KB |
| 40 | 3,943.9 KB | 3,483.2 KB | -460.7 KB |
| 50 | 4,280.6 KB | 3,558.9 KB | **-721.7 KB** |
This fix reduces memory growth by ~67%, bringing per-iteration growth
from ~27 KB down to ~9 KB.
The improvement comes from caching individual `ManifestFile` objects by
their `manifest_path` instead of caching entire manifest list tuples.
This deduplicates `ManifestFile` objects that appear in multiple
manifest lists (common after appends).
## Are there any user-facing changes?
<!-- In the case of user-facing changes, please add the changelog label.
-->
---------
Co-authored-by: Copilot <[email protected]>
---
pyiceberg/manifest.py | 51 +++++-
tests/benchmark/test_memory_benchmark.py | 287 +++++++++++++++++++++++++++++
tests/utils/test_manifest.py | 301 +++++++++++++++++++++++++++++--
3 files changed, 614 insertions(+), 25 deletions(-)
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index bd83075b..4c68f5e3 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -28,8 +28,7 @@ from typing import (
Literal,
)
-from cachetools import LRUCache, cached
-from cachetools.keys import hashkey
+from cachetools import LRUCache
from pydantic_core import to_json
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
@@ -892,15 +891,53 @@ class ManifestFile(Record):
return hash(self.manifest_path)
-# Global cache for manifest lists
-_manifest_cache: LRUCache[Any, tuple[ManifestFile, ...]] =
LRUCache(maxsize=128)
+# Global cache for ManifestFile objects, keyed by manifest_path.
+# This deduplicates ManifestFile objects across manifest lists, which commonly
+# share manifests after append operations.
+_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)
+
+# Lock for thread-safe cache access
+_manifest_cache_lock = threading.RLock()
-@cached(cache=_manifest_cache, key=lambda io, manifest_list:
hashkey(manifest_list), lock=threading.RLock())
def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
- """Read and cache manifests from the given manifest list, returning a
tuple to prevent modification."""
+ """Read manifests from a manifest list, deduplicating ManifestFile objects
via cache.
+
+ Caches individual ManifestFile objects by manifest_path. This is
memory-efficient
+ because consecutive manifest lists typically share most of their manifests:
+
+ ManifestList1: [ManifestFile1]
+ ManifestList2: [ManifestFile1, ManifestFile2]
+ ManifestList3: [ManifestFile1, ManifestFile2, ManifestFile3]
+
+ With per-ManifestFile caching, each ManifestFile is stored once and reused.
+
+ Note: The manifest list file is re-read on each call. This is intentional
to
+ keep the implementation simple and avoid O(N²) memory growth from caching
+ overlapping manifest list tuples. Re-reading is cheap since manifest lists
+ are small metadata files.
+
+ Args:
+ io: FileIO instance for reading the manifest list.
+ manifest_list: Path to the manifest list file.
+
+ Returns:
+ A tuple of ManifestFile objects.
+ """
file = io.new_input(manifest_list)
- return tuple(read_manifest_list(file))
+ manifest_files = list(read_manifest_list(file))
+
+ result = []
+ with _manifest_cache_lock:
+ for manifest_file in manifest_files:
+ manifest_path = manifest_file.manifest_path
+ if manifest_path in _manifest_cache:
+ result.append(_manifest_cache[manifest_path])
+ else:
+ _manifest_cache[manifest_path] = manifest_file
+ result.append(manifest_file)
+
+ return tuple(result)
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
diff --git a/tests/benchmark/test_memory_benchmark.py
b/tests/benchmark/test_memory_benchmark.py
new file mode 100644
index 00000000..82454c85
--- /dev/null
+++ b/tests/benchmark/test_memory_benchmark.py
@@ -0,0 +1,287 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Memory benchmarks for manifest cache efficiency.
+
+These benchmarks reproduce the manifest cache memory issue described in:
+https://github.com/apache/iceberg-python/issues/2325
+
+The issue: When caching manifest lists as tuples, overlapping ManifestFile
objects
+are duplicated across cache entries, causing O(N²) memory growth instead of
O(N).
+
+Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m
benchmark
+"""
+
+import gc
+import tracemalloc
+from datetime import datetime, timezone
+
+import pyarrow as pa
+import pytest
+
+from pyiceberg.catalog.memory import InMemoryCatalog
+from pyiceberg.manifest import _manifest_cache
+
+
+def generate_test_dataframe() -> pa.Table:
+ """Generate a PyArrow table for testing, similar to the issue's example."""
+ n_rows = 100 # Smaller for faster tests, increase for more realistic
benchmarks
+
+ return pa.table(
+ {
+ "event_type": ["playback"] * n_rows,
+ "event_origin": ["origin1"] * n_rows,
+ "event_send_at": [datetime.now(timezone.utc)] * n_rows,
+ "event_saved_at": [datetime.now(timezone.utc)] * n_rows,
+ "id": list(range(n_rows)),
+ "reference_id": [f"ref-{i}" for i in range(n_rows)],
+ }
+ )
+
+
[email protected]
+def memory_catalog(tmp_path_factory: pytest.TempPathFactory) ->
InMemoryCatalog:
+ """Create an in-memory catalog for memory testing."""
+ warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
+ catalog = InMemoryCatalog("memory_test",
warehouse=f"file://{warehouse_path}")
+ catalog.create_namespace("default")
+ return catalog
+
+
[email protected](autouse=True)
+def clear_caches() -> None:
+ """Clear caches before each test."""
+ _manifest_cache.clear()
+ gc.collect()
+
+
[email protected]
+def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
+ """Benchmark memory growth of manifest cache during repeated appends.
+
+ This test reproduces the issue from GitHub #2325 where each append creates
+ a new manifest list entry in the cache, causing memory to grow.
+
+ With the old caching strategy (tuple per manifest list), memory grew as
O(N²).
+ With the new strategy (individual ManifestFile objects), memory grows as
O(N).
+ """
+ df = generate_test_dataframe()
+ table = memory_catalog.create_table("default.memory_test",
schema=df.schema)
+
+ tracemalloc.start()
+
+ num_iterations = 50
+ memory_samples: list[tuple[int, int, int]] = [] # (iteration,
current_memory, cache_size)
+
+ print("\n--- Manifest Cache Memory Growth Benchmark ---")
+ print(f"Running {num_iterations} append operations...")
+
+ for i in range(num_iterations):
+ table.append(df)
+
+ # Sample memory at intervals
+ if (i + 1) % 10 == 0:
+ current, _ = tracemalloc.get_traced_memory()
+ cache_size = len(_manifest_cache)
+
+ memory_samples.append((i + 1, current, cache_size))
+ print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache
entries={cache_size}")
+
+ tracemalloc.stop()
+
+ # Analyze memory growth
+ if len(memory_samples) >= 2:
+ first_memory = memory_samples[0][1]
+ last_memory = memory_samples[-1][1]
+ memory_growth = last_memory - first_memory
+ growth_per_iteration = memory_growth / (memory_samples[-1][0] -
memory_samples[0][0])
+
+ print("\nResults:")
+ print(f" Initial memory: {first_memory / 1024:.1f} KB")
+ print(f" Final memory: {last_memory / 1024:.1f} KB")
+ print(f" Total growth: {memory_growth / 1024:.1f} KB")
+ print(f" Growth per iteration: {growth_per_iteration:.1f} bytes")
+ print(f" Final cache size: {memory_samples[-1][2]} entries")
+
+ # With efficient caching, growth should be roughly linear (O(N))
+ # rather than quadratic (O(N²)) as it was before
+ # Memory growth includes ManifestFile objects, metadata, and other
overhead
+ # We expect about 5-10 KB per iteration for typical workloads
+ # The key improvement is that growth is O(N) not O(N²)
+ # Threshold of 15KB/iteration based on observed behavior - O(N²) would
show ~50KB+/iteration
+ max_memory_growth_per_iteration_bytes = 15000
+ assert growth_per_iteration < max_memory_growth_per_iteration_bytes, (
+ f"Memory growth per iteration ({growth_per_iteration:.0f} bytes)
is too high. "
+ "This may indicate the O(N²) cache inefficiency is present."
+ )
+
+
[email protected]
+def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog)
-> None:
+ """Test that clearing the cache allows memory to be reclaimed.
+
+ This test verifies that when we clear the manifest cache, the associated
+ memory can be garbage collected.
+ """
+ df = generate_test_dataframe()
+ table = memory_catalog.create_table("default.gc_test", schema=df.schema)
+
+ tracemalloc.start()
+
+ print("\n--- Memory After GC Benchmark ---")
+
+ # Phase 1: Fill the cache
+ print("Phase 1: Filling cache with 20 appends...")
+ for _ in range(20):
+ table.append(df)
+
+ gc.collect()
+ before_clear_memory, _ = tracemalloc.get_traced_memory()
+ cache_size_before = len(_manifest_cache)
+ print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB")
+ print(f" Cache size: {cache_size_before}")
+
+ # Phase 2: Clear cache and GC
+ print("\nPhase 2: Clearing cache and running GC...")
+ _manifest_cache.clear()
+ gc.collect()
+ gc.collect() # Multiple GC passes for thorough cleanup
+
+ after_clear_memory, _ = tracemalloc.get_traced_memory()
+ print(f" Memory after clear: {after_clear_memory / 1024:.1f} KB")
+ print(f" Memory reclaimed: {(before_clear_memory - after_clear_memory) /
1024:.1f} KB")
+
+ tracemalloc.stop()
+
+ memory_reclaimed = before_clear_memory - after_clear_memory
+ print("\nResults:")
+ print(f" Memory reclaimed by clearing cache: {memory_reclaimed /
1024:.1f} KB")
+
+ # Verify that clearing the cache actually freed some memory
+ # Note: This may be flaky in some environments due to GC behavior
+ assert memory_reclaimed >= 0, "Memory should not increase after clearing
cache"
+
+
[email protected]
+def test_manifest_cache_deduplication_efficiency() -> None:
+ """Benchmark the efficiency of the per-ManifestFile caching strategy.
+
+ This test verifies that when multiple manifest lists share the same
+ ManifestFile objects, they are properly deduplicated in the cache.
+ """
+ from tempfile import TemporaryDirectory
+
+ from pyiceberg.io.pyarrow import PyArrowFileIO
+ from pyiceberg.manifest import (
+ DataFile,
+ DataFileContent,
+ FileFormat,
+ ManifestEntry,
+ ManifestEntryStatus,
+ _manifests,
+ write_manifest,
+ write_manifest_list,
+ )
+ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
+ from pyiceberg.schema import Schema
+ from pyiceberg.typedef import Record
+ from pyiceberg.types import IntegerType, NestedField
+
+ io = PyArrowFileIO()
+
+ print("\n--- Manifest Cache Deduplication Benchmark ---")
+
+ with TemporaryDirectory() as tmp_dir:
+ schema = Schema(NestedField(field_id=1, name="id",
field_type=IntegerType(), required=True))
+ spec = UNPARTITIONED_PARTITION_SPEC
+
+ # Create N manifest files
+ num_manifests = 20
+ manifest_files = []
+
+ print(f"Creating {num_manifests} manifest files...")
+ for i in range(num_manifests):
+ manifest_path = f"{tmp_dir}/manifest_{i}.avro"
+ with write_manifest(
+ format_version=2,
+ spec=spec,
+ schema=schema,
+ output_file=io.new_output(manifest_path),
+ snapshot_id=i + 1,
+ avro_compression="null",
+ ) as writer:
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=f"{tmp_dir}/data_{i}.parquet",
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ record_count=100,
+ file_size_in_bytes=1000,
+ )
+ writer.add_entry(
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=i + 1,
+ data_file=data_file,
+ )
+ )
+ manifest_files.append(writer.to_manifest_file())
+
+ # Create multiple manifest lists with overlapping manifest files
+ # List i contains manifest files 0 through i
+ num_lists = 10
+ print(f"Creating {num_lists} manifest lists with overlapping
manifests...")
+
+ _manifest_cache.clear()
+
+ for i in range(num_lists):
+ list_path = f"{tmp_dir}/manifest-list_{i}.avro"
+ manifests_to_include = manifest_files[: i + 1]
+
+ with write_manifest_list(
+ format_version=2,
+ output_file=io.new_output(list_path),
+ snapshot_id=i + 1,
+ parent_snapshot_id=i if i > 0 else None,
+ sequence_number=i + 1,
+ avro_compression="null",
+ ) as list_writer:
+ list_writer.add_manifests(manifests_to_include)
+
+ # Read the manifest list using _manifests (this populates the
cache)
+ _manifests(io, list_path)
+
+ # Analyze cache efficiency
+ cache_entries = len(_manifest_cache)
+ # List i contains manifests 0..i, so only the first num_lists
manifests are actually used
+ manifests_actually_used = num_lists
+
+ print("\nResults:")
+ print(f" Manifest lists created: {num_lists}")
+ print(f" Manifest files created: {num_manifests}")
+ print(f" Manifest files actually used: {manifests_actually_used}")
+ print(f" Cache entries: {cache_entries}")
+
+ # With efficient per-ManifestFile caching, we should have exactly
+ # manifests_actually_used entries (one per unique manifest path)
+ print(f"\n Expected cache entries (efficient):
{manifests_actually_used}")
+ print(f" Actual cache entries: {cache_entries}")
+
+ # The cache should be efficient - one entry per unique manifest path
+ assert cache_entries == manifests_actually_used, (
+ f"Cache has {cache_entries} entries, expected exactly
{manifests_actually_used}. "
+ "The cache may not be deduplicating properly."
+ )
diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py
index abd9878e..3862d6b6 100644
--- a/tests/utils/test_manifest.py
+++ b/tests/utils/test_manifest.py
@@ -16,7 +16,6 @@
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
-from unittest.mock import patch
import fastavro
import pytest
@@ -29,10 +28,12 @@ from pyiceberg.manifest import (
DataFileContent,
FileFormat,
ManifestContent,
+ ManifestEntry,
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
_manifest_cache,
+ _manifests,
read_manifest_list,
write_manifest,
write_manifest_list,
@@ -314,27 +315,33 @@ def
test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None:
- with patch("pyiceberg.manifest.read_manifest_list") as
mocked_read_manifest_list:
- io = load_file_io()
+ """Test that ManifestFile objects are cached and reused across multiple
reads.
- snapshot = Snapshot(
- snapshot_id=25,
- parent_snapshot_id=19,
- timestamp_ms=1602638573590,
- manifest_list=generated_manifest_file_file_v2,
- summary=Summary(Operation.APPEND),
- schema_id=3,
- )
+ The cache now stores individual ManifestFile objects by their
manifest_path,
+ rather than caching entire manifest list tuples. This is more
memory-efficient
+ when multiple manifest lists share overlapping ManifestFile objects.
+ """
+ io = load_file_io()
- # Access the manifests property multiple times to test caching
- manifests_first_call = snapshot.manifests(io)
- manifests_second_call = snapshot.manifests(io)
+ snapshot = Snapshot(
+ snapshot_id=25,
+ parent_snapshot_id=19,
+ timestamp_ms=1602638573590,
+ manifest_list=generated_manifest_file_file_v2,
+ summary=Summary(Operation.APPEND),
+ schema_id=3,
+ )
- # Ensure that read_manifest_list was called only once
- mocked_read_manifest_list.assert_called_once()
+ # Access the manifests property multiple times
+ manifests_first_call = snapshot.manifests(io)
+ manifests_second_call = snapshot.manifests(io)
- # Ensure that the same manifest list is returned
- assert manifests_first_call == manifests_second_call
+ # Ensure that the same manifest list content is returned
+ assert manifests_first_call == manifests_second_call
+
+ # Verify that ManifestFile objects are the same instances (cached)
+ for mf1, mf2 in zip(manifests_first_call, manifests_second_call,
strict=True):
+ assert mf1 is mf2, "ManifestFile objects should be the same cached
instance"
def test_write_empty_manifest() -> None:
@@ -632,3 +639,261 @@ def test_file_format_case_insensitive(raw_file_format:
str, expected_file_format
else:
with pytest.raises(ValueError):
_ = FileFormat(raw_file_format)
+
+
+def test_manifest_cache_deduplicates_manifest_files() -> None:
+ """Test that the manifest cache deduplicates ManifestFile objects across
manifest lists.
+
+ This test verifies the fix for
https://github.com/apache/iceberg-python/issues/2325
+
+ The issue was that when caching manifest lists by their path, overlapping
ManifestFile
+ objects were duplicated. For example:
+ - ManifestList1: (ManifestFile1)
+ - ManifestList2: (ManifestFile1, ManifestFile2)
+ - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+ With the old approach, ManifestFile1 was stored 3 times in the cache.
+ With the new approach, ManifestFile objects are cached individually by
their
+ manifest_path, so ManifestFile1 is stored only once and reused.
+ """
+ io = PyArrowFileIO()
+
+ with TemporaryDirectory() as tmp_dir:
+ # Create three manifest files to simulate manifests created during
appends
+ manifest1_path = f"{tmp_dir}/manifest1.avro"
+ manifest2_path = f"{tmp_dir}/manifest2.avro"
+ manifest3_path = f"{tmp_dir}/manifest3.avro"
+
+ schema = Schema(NestedField(field_id=1, name="id",
field_type=IntegerType(), required=True))
+ spec = UNPARTITIONED_PARTITION_SPEC
+
+ # Create manifest file 1
+ with write_manifest(
+ format_version=2,
+ spec=spec,
+ schema=schema,
+ output_file=io.new_output(manifest1_path),
+ snapshot_id=1,
+ avro_compression="zstandard",
+ ) as writer:
+ data_file1 = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=f"{tmp_dir}/data1.parquet",
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ record_count=100,
+ file_size_in_bytes=1000,
+ )
+ writer.add_entry(
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=1,
+ data_file=data_file1,
+ )
+ )
+ manifest_file1 = writer.to_manifest_file()
+
+ # Create manifest file 2
+ with write_manifest(
+ format_version=2,
+ spec=spec,
+ schema=schema,
+ output_file=io.new_output(manifest2_path),
+ snapshot_id=2,
+ avro_compression="zstandard",
+ ) as writer:
+ data_file2 = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=f"{tmp_dir}/data2.parquet",
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ record_count=200,
+ file_size_in_bytes=2000,
+ )
+ writer.add_entry(
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=2,
+ data_file=data_file2,
+ )
+ )
+ manifest_file2 = writer.to_manifest_file()
+
+ # Create manifest file 3
+ with write_manifest(
+ format_version=2,
+ spec=spec,
+ schema=schema,
+ output_file=io.new_output(manifest3_path),
+ snapshot_id=3,
+ avro_compression="zstandard",
+ ) as writer:
+ data_file3 = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=f"{tmp_dir}/data3.parquet",
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ record_count=300,
+ file_size_in_bytes=3000,
+ )
+ writer.add_entry(
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=3,
+ data_file=data_file3,
+ )
+ )
+ manifest_file3 = writer.to_manifest_file()
+
+ # Create manifest list 1: contains only manifest1
+ manifest_list1_path = f"{tmp_dir}/manifest-list1.avro"
+ with write_manifest_list(
+ format_version=2,
+ output_file=io.new_output(manifest_list1_path),
+ snapshot_id=1,
+ parent_snapshot_id=None,
+ sequence_number=1,
+ avro_compression="zstandard",
+ ) as list_writer:
+ list_writer.add_manifests([manifest_file1])
+
+ # Create manifest list 2: contains manifest1 and manifest2
(overlapping manifest1)
+ manifest_list2_path = f"{tmp_dir}/manifest-list2.avro"
+ with write_manifest_list(
+ format_version=2,
+ output_file=io.new_output(manifest_list2_path),
+ snapshot_id=2,
+ parent_snapshot_id=1,
+ sequence_number=2,
+ avro_compression="zstandard",
+ ) as list_writer:
+ list_writer.add_manifests([manifest_file1, manifest_file2])
+
+ # Create manifest list 3: contains all three manifests (overlapping
manifest1 and manifest2)
+ manifest_list3_path = f"{tmp_dir}/manifest-list3.avro"
+ with write_manifest_list(
+ format_version=2,
+ output_file=io.new_output(manifest_list3_path),
+ snapshot_id=3,
+ parent_snapshot_id=2,
+ sequence_number=3,
+ avro_compression="zstandard",
+ ) as list_writer:
+ list_writer.add_manifests([manifest_file1, manifest_file2,
manifest_file3])
+
+ # Read all three manifest lists
+ manifests1 = _manifests(io, manifest_list1_path)
+ manifests2 = _manifests(io, manifest_list2_path)
+ manifests3 = _manifests(io, manifest_list3_path)
+
+ # Verify the manifest files have the expected paths
+ assert len(manifests1) == 1
+ assert len(manifests2) == 2
+ assert len(manifests3) == 3
+
+ # Verify that ManifestFile objects with the same manifest_path are the
same object (identity)
+ # This is the key assertion - if caching works correctly, the same
ManifestFile
+ # object should be reused instead of creating duplicates
+
+ # manifest_file1 appears in all three lists - should be the same object
+ assert manifests1[0] is manifests2[0], "ManifestFile1 should be the
same object instance across manifest lists"
+ assert manifests2[0] is manifests3[0], "ManifestFile1 should be the
same object instance across manifest lists"
+
+ # manifest_file2 appears in lists 2 and 3 - should be the same object
+ assert manifests2[1] is manifests3[1], "ManifestFile2 should be the
same object instance across manifest lists"
+
+ # Verify cache size - should only have 3 unique ManifestFile objects
+ # instead of 1 + 2 + 3 = 6 objects as with the old approach
+ assert len(_manifest_cache) == 3, (
+ f"Cache should contain exactly 3 unique ManifestFile objects, but
has {len(_manifest_cache)}"
+ )
+
+
+def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
+ """Test that the manifest cache remains efficient with many overlapping
manifest lists.
+
+ This simulates the scenario from GitHub issue #2325 where many appends
create
+ manifest lists that increasingly overlap.
+ """
+ io = PyArrowFileIO()
+
+ with TemporaryDirectory() as tmp_dir:
+ schema = Schema(NestedField(field_id=1, name="id",
field_type=IntegerType(), required=True))
+ spec = UNPARTITIONED_PARTITION_SPEC
+
+ num_manifests = 10
+ manifest_files = []
+
+ # Create N manifest files
+ for i in range(num_manifests):
+ manifest_path = f"{tmp_dir}/manifest{i}.avro"
+ with write_manifest(
+ format_version=2,
+ spec=spec,
+ schema=schema,
+ output_file=io.new_output(manifest_path),
+ snapshot_id=i + 1,
+ avro_compression="zstandard",
+ ) as writer:
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=f"{tmp_dir}/data{i}.parquet",
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ record_count=100 * (i + 1),
+ file_size_in_bytes=1000 * (i + 1),
+ )
+ writer.add_entry(
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=i + 1,
+ data_file=data_file,
+ )
+ )
+ manifest_files.append(writer.to_manifest_file())
+
+ # Create N manifest lists, each containing an increasing number of
manifests
+ # list[i] contains manifests[0:i+1]
+ manifest_list_paths = []
+ for i in range(num_manifests):
+ list_path = f"{tmp_dir}/manifest-list{i}.avro"
+ with write_manifest_list(
+ format_version=2,
+ output_file=io.new_output(list_path),
+ snapshot_id=i + 1,
+ parent_snapshot_id=i if i > 0 else None,
+ sequence_number=i + 1,
+ avro_compression="zstandard",
+ ) as list_writer:
+ list_writer.add_manifests(manifest_files[: i + 1])
+ manifest_list_paths.append(list_path)
+
+ # Read all manifest lists
+ all_results = []
+ for path in manifest_list_paths:
+ result = _manifests(io, path)
+ all_results.append(result)
+
+ # With the old cache approach, we would have:
+ # 1 + 2 + 3 + ... + N = N*(N+1)/2 ManifestFile objects in memory
+ # With the new approach, we should have exactly N objects
+
+ # Verify cache has exactly N unique entries
+ assert len(_manifest_cache) == num_manifests, (
+ f"Cache should contain exactly {num_manifests} ManifestFile
objects, "
+ f"but has {len(_manifest_cache)}. "
+ f"Old approach would have {num_manifests * (num_manifests + 1) //
2} objects."
+ )
+
+ # Verify object identity - all references to the same manifest should
be the same object
+ for i in range(num_manifests):
+ # Find all references to this manifest across all results
+ references = []
+ for j, result in enumerate(all_results):
+ if j >= i: # This manifest should be in lists from i onwards
+ references.append(result[i])
+
+ # All references should be the same object
+ if len(references) > 1:
+ for ref in references[1:]:
+ assert ref is references[0], f"All references to manifest
{i} should be the same object instance"