This is an automated email from the ASF dual-hosted git repository.
sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 6b7f07a3 Use `cachetools's LRUCache` to cache manifest list (#1187)
6b7f07a3 is described below
commit 6b7f07a3815349297f765c919f8fa170c2c4562b
Author: Kevin Liu <[email protected]>
AuthorDate: Tue Sep 24 08:45:53 2024 -0700
Use `cachetools's LRUCache` to cache manifest list (#1187)
* use cachetools
* use LRU cache
* return tuple
* comment
* clear global cache for tests
* move _manifests to manifest.py
* rebase poetry.lock
---
poetry.lock | 4 +-
pyiceberg/manifest.py | 10 ++
pyiceberg/table/snapshots.py | 12 +-
pyproject.toml | 313 +++++++++++++++++++++++++++++++++++++++++++
tests/utils/test_manifest.py | 32 +++++
5 files changed, 359 insertions(+), 12 deletions(-)
diff --git a/poetry.lock b/poetry.lock
index 34a884cb..6a82f274 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -439,7 +439,7 @@ virtualenv = ["virtualenv (>=20.0.35)"]
name = "cachetools"
version = "5.5.0"
description = "Extensible memoizing collections and decorators"
-optional = true
+optional = false
python-versions = ">=3.7"
files = [
{file = "cachetools-5.5.0-py3-none-any.whl", hash =
"sha256:02134e8439cdc2ffb62023ce1debca2944c3f289d66bb17ead3ab3dede74b292"},
@@ -4650,4 +4650,4 @@ zstandard = ["zstandard"]
[metadata]
lock-version = "2.0"
python-versions = "^3.8, <3.13, !=3.9.7"
-content-hash =
"086f6774fe006d24ded1141068f63f2aa22c356c75979b2b44781d43dc10d977"
+content-hash =
"66129acb77e056f086d3cff1d3cfb74d25518ad9ebf03d3ca7e4add0ec9b3221"
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index 960952d0..649840fc 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -28,9 +28,12 @@ from typing import (
List,
Literal,
Optional,
+ Tuple,
Type,
)
+from cachetools import LRUCache, cached
+from cachetools.keys import hashkey
from pydantic_core import to_json
from pyiceberg.avro.file import AvroFile, AvroOutputFile
@@ -620,6 +623,13 @@ class ManifestFile(Record):
]
+@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list:
hashkey(manifest_list))
+def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]:
+ """Read and cache manifests from the given manifest list, returning a
tuple to prevent modification."""
+ file = io.new_input(manifest_list)
+ return tuple(read_manifest_list(file))
+
+
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
"""
Read the manifests from the manifest list.
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 980399a2..829bd602 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -19,13 +19,12 @@ from __future__ import annotations
import time
from collections import defaultdict
from enum import Enum
-from functools import lru_cache
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List,
Mapping, Optional
from pydantic import Field, PrivateAttr, model_serializer
from pyiceberg.io import FileIO
-from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile,
read_manifest_list
+from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile,
_manifests
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
@@ -231,13 +230,6 @@ class Summary(IcebergBaseModel, Mapping[str, str]):
)
-@lru_cache
-def _manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
- """Return the manifests from the manifest list."""
- file = io.new_input(manifest_list)
- return list(read_manifest_list(file))
-
-
class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id",
default=None)
@@ -260,7 +252,7 @@ class Snapshot(IcebergBaseModel):
def manifests(self, io: FileIO) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
if self.manifest_list:
- return _manifests(io, self.manifest_list)
+ return list(_manifests(io, self.manifest_list))
return []
diff --git a/pyproject.toml b/pyproject.toml
index 7126f905..c7739137 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -83,6 +83,7 @@ numpy = [
{ version = "1.26.0", python = ">=3.9,<3.13", optional = true },
{ version = "1.24.4", python = ">=3.8,<3.9", optional = true }
]
+cachetools = "^5.5.0"
[tool.poetry.group.dev.dependencies]
pytest = "7.4.4"
@@ -571,6 +572,318 @@ ignore_missing_imports = true
module = "tenacity.*"
ignore_missing_imports = true
+[[tool.mypy.overrides]]
+module = "pyarrow.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pandas.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "snappy.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "zstandard.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pydantic.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pydantic_core.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pytest.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "fastavro.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "mmh3.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "hive_metastore.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "thrift.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "requests_mock.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "click.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "rich.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "fsspec.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "s3fs.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "azure.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "adlfs.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "gcsfs.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "packaging.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "tests.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "boto3"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "botocore.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "mypy_boto3_glue.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "moto"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "aiobotocore.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "aiohttp.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "duckdb.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "ray.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "daft.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pyparsing.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pyspark.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "strictyaml.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "sortedcontainers.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "numpy.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "sqlalchemy.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "Cython.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "setuptools.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "tenacity.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pyarrow.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pandas.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "snappy.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "zstandard.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pydantic.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pydantic_core.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pytest.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "fastavro.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "mmh3.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "hive_metastore.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "thrift.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "requests_mock.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "click.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "rich.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "fsspec.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "s3fs.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "azure.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "adlfs.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "gcsfs.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "packaging.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "tests.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "boto3"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "botocore.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "mypy_boto3_glue.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "moto"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "aiobotocore.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "aiohttp.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "duckdb.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "ray.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "daft.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pyparsing.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "pyspark.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "strictyaml.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "sortedcontainers.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "numpy.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "sqlalchemy.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "Cython.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "setuptools.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "tenacity.*"
+ignore_missing_imports = true
+
[tool.poetry.scripts]
pyiceberg = "pyiceberg.cli.console:run"
diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py
index ef33b16b..bb60ac0a 100644
--- a/tests/utils/test_manifest.py
+++ b/tests/utils/test_manifest.py
@@ -17,6 +17,7 @@
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
from typing import Dict
+from unittest.mock import patch
import fastavro
import pytest
@@ -31,6 +32,7 @@ from pyiceberg.manifest import (
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
+ _manifests,
read_manifest_list,
write_manifest,
write_manifest_list,
@@ -43,6 +45,12 @@ from pyiceberg.typedef import Record, TableVersion
from pyiceberg.types import IntegerType, NestedField
[email protected](autouse=True)
+def clear_global_manifests_cache() -> None:
+ # Clear the global cache before each test
+ _manifests.cache_clear() # type: ignore
+
+
def _verify_metadata_with_fastavro(avro_file: str, expected_metadata:
Dict[str, str]) -> None:
with open(avro_file, "rb") as f:
reader = fastavro.reader(f)
@@ -306,6 +314,30 @@ def test_read_manifest_v2(generated_manifest_file_file_v2:
str) -> None:
assert entry.status == ManifestEntryStatus.ADDED
+def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None:
+ with patch("pyiceberg.manifest.read_manifest_list") as
mocked_read_manifest_list:
+ io = load_file_io()
+
+ snapshot = Snapshot(
+ snapshot_id=25,
+ parent_snapshot_id=19,
+ timestamp_ms=1602638573590,
+ manifest_list=generated_manifest_file_file_v2,
+ summary=Summary(Operation.APPEND),
+ schema_id=3,
+ )
+
+ # Access the manifests property multiple times to test caching
+ manifests_first_call = snapshot.manifests(io)
+ manifests_second_call = snapshot.manifests(io)
+
+ # Ensure that read_manifest_list was called only once
+ mocked_read_manifest_list.assert_called_once()
+
+ # Ensure that the same manifest list is returned
+ assert manifests_first_call == manifests_second_call
+
+
def test_write_empty_manifest() -> None:
io = load_file_io()
test_schema = Schema(NestedField(1, "foo", IntegerType(), False))