This is an automated email from the ASF dual-hosted git repository.
sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 1d4ed060 backward compat (#1151)
1d4ed060 is described below
commit 1d4ed06066e293652841e2a159a59347170ee309
Author: Sung Yun <[email protected]>
AuthorDate: Thu Sep 12 20:07:48 2024 -0400
backward compat (#1151)
---
pyiceberg/table/__init__.py | 65 ++++++++++++++++++++++++++++++++++++--
pyiceberg/table/update/schema.py | 42 +++++++++++++-----------
pyiceberg/table/update/snapshot.py | 26 +++++++--------
tests/table/test_init.py | 15 +++++++++
4 files changed, 114 insertions(+), 34 deletions(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 21304cd5..3eedff45 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -106,8 +106,15 @@ from pyiceberg.table.update import (
UpgradeFormatVersionUpdate,
update_table_metadata,
)
-from pyiceberg.table.update.schema import UpdateSchema
-from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot
+from pyiceberg.table.update.schema import UpdateSchema, _Move, _MoveOperation
+from pyiceberg.table.update.snapshot import (
+ ManageSnapshots,
+ UpdateSnapshot,
+ _DeleteFiles,
+ _FastAppendFiles,
+ _MergeAppendFiles,
+ _OverwriteFiles,
+)
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.typedef import (
EMPTY_DICT,
@@ -1464,3 +1471,57 @@ def _parquet_files_to_data_files(table_metadata:
TableMetadata, file_paths: List
from pyiceberg.io.pyarrow import parquet_files_to_data_files
yield from parquet_files_to_data_files(io=io,
table_metadata=table_metadata, file_paths=iter(file_paths))
+
+
+@deprecated(
+ deprecated_in="0.8.0",
+ removed_in="0.9.0",
+ help_message="pyiceberg.table.Move has been changed to private class
pyiceberg.table.update.schema._Move",
+)
+def Move(*args: Any, **kwargs: Any) -> _Move:
+ return _Move(*args, **kwargs)
+
+
+@deprecated(
+ deprecated_in="0.8.0",
+ removed_in="0.9.0",
+ help_message="pyiceberg.table.MoveOperation has been changed to private
class pyiceberg.table.update.schema._MoveOperation",
+)
+def MoveOperation(*args: Any, **kwargs: Any) -> _MoveOperation:
+ return _MoveOperation(*args, **kwargs)
+
+
+@deprecated(
+ deprecated_in="0.8.0",
+ removed_in="0.9.0",
+ help_message="pyiceberg.table.DeleteFiles has been changed to private
class pyiceberg.table.update.snapshot._DeleteFiles",
+)
+def DeleteFiles(*args: Any, **kwargs: Any) -> _DeleteFiles:
+ return _DeleteFiles(*args, **kwargs)
+
+
+@deprecated(
+ deprecated_in="0.8.0",
+ removed_in="0.9.0",
+ help_message="pyiceberg.table.FastAppendFiles has been changed to private
class pyiceberg.table.update.snapshot._FastAppendFiles",
+)
+def FastAppendFiles(*args: Any, **kwargs: Any) -> _FastAppendFiles:
+ return _FastAppendFiles(*args, **kwargs)
+
+
+@deprecated(
+ deprecated_in="0.8.0",
+ removed_in="0.9.0",
+ help_message="pyiceberg.table.MergeAppendFiles has been changed to private
class pyiceberg.table.update.snapshot._MergeAppendFiles",
+)
+def MergeAppendFiles(*args: Any, **kwargs: Any) -> _MergeAppendFiles:
+ return _MergeAppendFiles(*args, **kwargs)
+
+
+@deprecated(
+ deprecated_in="0.8.0",
+ removed_in="0.9.0",
+ help_message="pyiceberg.table.OverwriteFiles has been changed to private
class pyiceberg.table.update.snapshot._OverwriteFiles",
+)
+def OverwriteFiles(*args: Any, **kwargs: Any) -> _OverwriteFiles:
+ return _OverwriteFiles(*args, **kwargs)
diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py
index 471567fe..0442a604 100644
--- a/pyiceberg/table/update/schema.py
+++ b/pyiceberg/table/update/schema.py
@@ -57,17 +57,17 @@ if TYPE_CHECKING:
TABLE_ROOT_ID = -1
-class MoveOperation(Enum):
+class _MoveOperation(Enum):
First = 1
Before = 2
After = 3
@dataclass
-class Move:
+class _Move:
field_id: int
full_name: str
- op: MoveOperation
+ op: _MoveOperation
other_field_id: Optional[int] = None
@@ -79,7 +79,7 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
_adds: Dict[int, List[NestedField]] = {}
_updates: Dict[int, NestedField] = {}
_deletes: Set[int] = set()
- _moves: Dict[int, List[Move]] = {}
+ _moves: Dict[int, List[_Move]] = {}
_added_name_to_id: Dict[str, int] = {}
# Part of https://github.com/apache/iceberg/pull/8393
@@ -146,7 +146,7 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
-1,
- UnionByNameVisitor(update_schema=self,
existing_schema=self._schema, case_sensitive=self._case_sensitive),
+ _UnionByNameVisitor(update_schema=self,
existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
PartnerIdByNameAccessor(partner_schema=self._schema,
case_sensitive=self._case_sensitive),
)
@@ -410,13 +410,13 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
return self._added_name_to_id.get(name)
- def _move(self, move: Move) -> None:
+ def _move(self, move: _Move) -> None:
if parent_name := self._id_to_parent.get(move.field_id):
parent_field = self._schema.find_field(parent_name,
case_sensitive=self._case_sensitive)
if not parent_field.field_type.is_struct:
raise ValueError(f"Cannot move fields in non-struct type:
{parent_field.field_type}")
- if move.op == MoveOperation.After or move.op ==
MoveOperation.Before:
+ if move.op == _MoveOperation.After or move.op ==
_MoveOperation.Before:
if move.other_field_id is None:
raise ValueError("Expected other field when performing
before/after move")
@@ -426,7 +426,7 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
self._moves[parent_field.field_id] =
self._moves.get(parent_field.field_id, []) + [move]
else:
# In the top level field
- if move.op == MoveOperation.After or move.op ==
MoveOperation.Before:
+ if move.op == _MoveOperation.After or move.op ==
_MoveOperation.Before:
if move.other_field_id is None:
raise ValueError("Expected other field when performing
before/after move")
@@ -451,7 +451,7 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
if field_id is None:
raise ValueError(f"Cannot move missing column: {full_name}")
- self._move(Move(field_id=field_id, full_name=full_name,
op=MoveOperation.First))
+ self._move(_Move(field_id=field_id, full_name=full_name,
op=_MoveOperation.First))
return self
@@ -485,7 +485,7 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
if field_id == before_field_id:
raise ValueError(f"Cannot move {full_name} before itself")
- self._move(Move(field_id=field_id, full_name=full_name,
other_field_id=before_field_id, op=MoveOperation.Before))
+ self._move(_Move(field_id=field_id, full_name=full_name,
other_field_id=before_field_id, op=_MoveOperation.Before))
return self
@@ -514,7 +514,7 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
if field_id == after_field_id:
raise ValueError(f"Cannot move {full_name} after itself")
- self._move(Move(field_id=field_id, full_name=full_name,
other_field_id=after_field_id, op=MoveOperation.After))
+ self._move(_Move(field_id=field_id, full_name=full_name,
other_field_id=after_field_id, op=_MoveOperation.After))
return self
@@ -592,10 +592,14 @@ class _ApplyChanges(SchemaVisitor[Optional[IcebergType]]):
_adds: Dict[int, List[NestedField]]
_updates: Dict[int, NestedField]
_deletes: Set[int]
- _moves: Dict[int, List[Move]]
+ _moves: Dict[int, List[_Move]]
def __init__(
- self, adds: Dict[int, List[NestedField]], updates: Dict[int,
NestedField], deletes: Set[int], moves: Dict[int, List[Move]]
+ self,
+ adds: Dict[int, List[NestedField]],
+ updates: Dict[int, NestedField],
+ deletes: Set[int],
+ moves: Dict[int, List[_Move]],
) -> None:
self._adds = adds
self._updates = updates
@@ -715,7 +719,7 @@ class _ApplyChanges(SchemaVisitor[Optional[IcebergType]]):
return primitive
-class UnionByNameVisitor(SchemaWithPartnerVisitor[int, bool]):
+class _UnionByNameVisitor(SchemaWithPartnerVisitor[int, bool]):
update_schema: UpdateSchema
existing_schema: Schema
case_sensitive: bool
@@ -873,7 +877,7 @@ def _add_fields(fields: Tuple[NestedField, ...], adds:
Optional[List[NestedField
return fields + tuple(adds)
-def _move_fields(fields: Tuple[NestedField, ...], moves: List[Move]) ->
Tuple[NestedField, ...]:
+def _move_fields(fields: Tuple[NestedField, ...], moves: List[_Move]) ->
Tuple[NestedField, ...]:
reordered = list(copy(fields))
for move in moves:
# Find the field that we're about to move
@@ -881,12 +885,12 @@ def _move_fields(fields: Tuple[NestedField, ...], moves:
List[Move]) -> Tuple[Ne
# Remove the field that we're about to move from the list
reordered = [field for field in reordered if field.field_id !=
move.field_id]
- if move.op == MoveOperation.First:
+ if move.op == _MoveOperation.First:
reordered = [field] + reordered
- elif move.op == MoveOperation.Before or move.op == MoveOperation.After:
+ elif move.op == _MoveOperation.Before or move.op ==
_MoveOperation.After:
other_field_id = move.other_field_id
other_field_pos = next(i for i, field in enumerate(reordered) if
field.field_id == other_field_id)
- if move.op == MoveOperation.Before:
+ if move.op == _MoveOperation.Before:
reordered.insert(other_field_pos, field)
else:
reordered.insert(other_field_pos + 1, field)
@@ -897,7 +901,7 @@ def _move_fields(fields: Tuple[NestedField, ...], moves:
List[Move]) -> Tuple[Ne
def _add_and_move_fields(
- fields: Tuple[NestedField, ...], adds: List[NestedField], moves: List[Move]
+ fields: Tuple[NestedField, ...], adds: List[NestedField], moves:
List[_Move]
) -> Optional[Tuple[NestedField, ...]]:
if len(adds) > 0:
# always apply adds first so that added fields can be moved
diff --git a/pyiceberg/table/update/snapshot.py
b/pyiceberg/table/update/snapshot.py
index 2b9354d2..47e5fc55 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -307,7 +307,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
return manifest.fetch_manifest_entry(io=self._io,
discard_deleted=discard_deleted)
-class DeleteFiles(_SnapshotProducer["DeleteFiles"]):
+class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
"""Will delete manifest entries from the current snapshot based on the
predicate.
This will produce a DELETE snapshot:
@@ -443,7 +443,7 @@ class DeleteFiles(_SnapshotProducer["DeleteFiles"]):
return len(self._deleted_entries()) > 0
-class FastAppendFiles(_SnapshotProducer["FastAppendFiles"]):
+class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
@@ -472,7 +472,7 @@ class FastAppendFiles(_SnapshotProducer["FastAppendFiles"]):
return []
-class MergeAppendFiles(FastAppendFiles):
+class _MergeAppendFiles(_FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool
@@ -507,7 +507,7 @@ class MergeAppendFiles(FastAppendFiles):
def _process_manifests(self, manifests: List[ManifestFile]) ->
List[ManifestFile]:
"""To perform any post-processing on the manifests before writing them
to the new snapshot.
- In MergeAppendFiles, we merge manifests based on the target size and
the minimum count to merge
+ In _MergeAppendFiles, we merge manifests based on the target size and
the minimum count to merge
if automatic merge is enabled.
"""
unmerged_data_manifests = [manifest for manifest in manifests if
manifest.content == ManifestContent.DATA]
@@ -523,7 +523,7 @@ class MergeAppendFiles(FastAppendFiles):
return
data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) +
unmerged_deletes_manifests
-class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]):
+class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.
Data and delete files were added and removed in a logical overwrite
operation.
@@ -610,18 +610,18 @@ class UpdateSnapshot:
self._io = io
self._snapshot_properties = snapshot_properties
- def fast_append(self) -> FastAppendFiles:
- return FastAppendFiles(
+ def fast_append(self) -> _FastAppendFiles:
+ return _FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction,
io=self._io, snapshot_properties=self._snapshot_properties
)
- def merge_append(self) -> MergeAppendFiles:
- return MergeAppendFiles(
+ def merge_append(self) -> _MergeAppendFiles:
+ return _MergeAppendFiles(
operation=Operation.APPEND, transaction=self._transaction,
io=self._io, snapshot_properties=self._snapshot_properties
)
- def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) ->
OverwriteFiles:
- return OverwriteFiles(
+ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) ->
_OverwriteFiles:
+ return _OverwriteFiles(
commit_uuid=commit_uuid,
operation=Operation.OVERWRITE
if self._transaction.table_metadata.current_snapshot() is not None
@@ -631,8 +631,8 @@ class UpdateSnapshot:
snapshot_properties=self._snapshot_properties,
)
- def delete(self) -> DeleteFiles:
- return DeleteFiles(
+ def delete(self) -> _DeleteFiles:
+ return _DeleteFiles(
operation=Operation.DELETE,
transaction=self._transaction,
io=self._io,
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 6f31a375..1c4029a2 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -1243,3 +1243,18 @@ def test_update_metadata_log_overflow(table_v2: Table)
-> None:
table_v2.metadata_location,
)
assert len(new_metadata.metadata_log) == 1
+
+
+def test_table_module_refactoring_backward_compatibility() -> None:
+ # TODO: Remove this in 0.9.0
+ try:
+ from pyiceberg.table import ( # noqa: F401
+ DeleteFiles,
+ FastAppendFiles,
+ MergeAppendFiles,
+ Move,
+ MoveOperation,
+ OverwriteFiles,
+ )
+ except Exception as exc:
+ raise pytest.fail("Importing moved modules should not raise an
exception") from exc