This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 5aa451d4 Rename data_sequence_number to sequence_number in
ManifestEntry (#900)
5aa451d4 is described below
commit 5aa451d41c2e7a89032a75f2e2adea31e10af309
Author: Soumya Ghosh <[email protected]>
AuthorDate: Thu Jul 11 07:57:05 2024 +0530
Rename data_sequence_number to sequence_number in ManifestEntry (#900)
---
pyiceberg/manifest.py | 58 ++++++++++++++++++++------------------------
pyiceberg/table/__init__.py | 24 +++++++++---------
tests/avro/test_file.py | 10 ++++----
tests/utils/test_manifest.py | 10 ++++----
4 files changed, 48 insertions(+), 54 deletions(-)
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index 6148d9a6..960952d0 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -377,7 +377,7 @@ MANIFEST_ENTRY_SCHEMAS = {
2: Schema(
NestedField(0, "status", IntegerType(), required=True),
NestedField(1, "snapshot_id", LongType(), required=False),
- NestedField(3, "data_sequence_number", LongType(), required=False),
+ NestedField(3, "sequence_number", LongType(), required=False),
NestedField(4, "file_sequence_number", LongType(), required=False),
NestedField(2, "data_file", DATA_FILE_TYPE[2], required=True),
),
@@ -394,10 +394,10 @@ def manifest_entry_schema_with_data_file(format_version:
TableVersion, data_file
class ManifestEntry(Record):
- __slots__ = ("status", "snapshot_id", "data_sequence_number",
"file_sequence_number", "data_file")
+ __slots__ = ("status", "snapshot_id", "sequence_number",
"file_sequence_number", "data_file")
status: ManifestEntryStatus
snapshot_id: Optional[int]
- data_sequence_number: Optional[int]
+ sequence_number: Optional[int]
file_sequence_number: Optional[int]
data_file: DataFile
@@ -408,43 +408,39 @@ class ManifestEntry(Record):
self,
new_status: ManifestEntryStatus,
new_snapshot_id: Optional[int],
- new_data_sequence_number: Optional[int],
+ new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
- self.data_sequence_number = new_data_sequence_number
+ self.sequence_number = new_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self
def _wrap_append(
- self, new_snapshot_id: Optional[int], new_data_sequence_number:
Optional[int], new_file: DataFile
+ self, new_snapshot_id: Optional[int], new_sequence_number:
Optional[int], new_file: DataFile
) -> ManifestEntry:
- return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id,
new_data_sequence_number, None, new_file)
+ return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id,
new_sequence_number, None, new_file)
def _wrap_delete(
self,
new_snapshot_id: Optional[int],
- new_data_sequence_number: Optional[int],
+ new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
- return self._wrap(
- ManifestEntryStatus.DELETED, new_snapshot_id,
new_data_sequence_number, new_file_sequence_number, new_file
- )
+ return self._wrap(ManifestEntryStatus.DELETED, new_snapshot_id,
new_sequence_number, new_file_sequence_number, new_file)
def _wrap_existing(
self,
new_snapshot_id: Optional[int],
- new_data_sequence_number: Optional[int],
+ new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
- return self._wrap(
- ManifestEntryStatus.EXISTING, new_snapshot_id,
new_data_sequence_number, new_file_sequence_number, new_file
- )
+ return self._wrap(ManifestEntryStatus.EXISTING, new_snapshot_id,
new_sequence_number, new_file_sequence_number, new_file)
PARTITION_FIELD_SUMMARY_TYPE = StructType(
@@ -665,10 +661,10 @@ def _inherit_from_manifest(entry: ManifestEntry,
manifest: ManifestFile) -> Mani
if entry.snapshot_id is None:
entry.snapshot_id = manifest.added_snapshot_id
- # in v1 tables, the data sequence number is not persisted and can be
safely defaulted to 0
- # in v2 tables, the data sequence number should be inherited iff the entry
status is ADDED
- if entry.data_sequence_number is None and (manifest.sequence_number == 0
or entry.status == ManifestEntryStatus.ADDED):
- entry.data_sequence_number = manifest.sequence_number
+ # in v1 tables, the sequence number is not persisted and can be safely
defaulted to 0
+ # in v2 tables, the sequence number should be inherited iff the entry
status is ADDED
+ if entry.sequence_number is None and (manifest.sequence_number == 0 or
entry.status == ManifestEntryStatus.ADDED):
+ entry.sequence_number = manifest.sequence_number
# in v1 tables, the file sequence number is not persisted and can be
safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry
status is ADDED
@@ -695,7 +691,7 @@ class ManifestWriter(ABC):
_existing_rows: int
_deleted_files: int
_deleted_rows: int
- _min_data_sequence_number: Optional[int]
+ _min_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry
@@ -712,7 +708,7 @@ class ManifestWriter(ABC):
self._existing_rows = 0
self._deleted_files = 0
self._deleted_rows = 0
- self._min_data_sequence_number = None
+ self._min_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()
@@ -774,7 +770,7 @@ class ManifestWriter(ABC):
"""Return the manifest file."""
# once the manifest file is generated, no more entries can be added
self.closed = True
- min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
+ min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
return ManifestFile(
manifest_path=self._output_file.location,
manifest_length=len(self._writer.output_file),
@@ -812,19 +808,17 @@ class ManifestWriter(ABC):
if (
(entry.status == ManifestEntryStatus.ADDED or entry.status ==
ManifestEntryStatus.EXISTING)
- and entry.data_sequence_number is not None
- and (self._min_data_sequence_number is None or
entry.data_sequence_number < self._min_data_sequence_number)
+ and entry.sequence_number is not None
+ and (self._min_sequence_number is None or entry.sequence_number <
self._min_sequence_number)
):
- self._min_data_sequence_number = entry.data_sequence_number
+ self._min_sequence_number = entry.sequence_number
self._writer.write_block([self.prepare_entry(entry)])
return self
def add(self, entry: ManifestEntry) -> ManifestWriter:
- if entry.data_sequence_number is not None and
entry.data_sequence_number >= 0:
- self.add_entry(
- self._reused_entry_wrapper._wrap_append(self._snapshot_id,
entry.data_sequence_number, entry.data_file)
- )
+ if entry.sequence_number is not None and entry.sequence_number >= 0:
+
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id,
entry.sequence_number, entry.data_file))
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None,
entry.data_file))
return self
@@ -832,7 +826,7 @@ class ManifestWriter(ABC):
def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
- self._snapshot_id, entry.data_sequence_number,
entry.file_sequence_number, entry.data_file
+ self._snapshot_id, entry.sequence_number,
entry.file_sequence_number, entry.data_file
)
)
return self
@@ -840,7 +834,7 @@ class ManifestWriter(ABC):
def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
- entry.snapshot_id, entry.data_sequence_number,
entry.file_sequence_number, entry.data_file
+ entry.snapshot_id, entry.sequence_number,
entry.file_sequence_number, entry.data_file
)
)
return self
@@ -885,7 +879,7 @@ class ManifestWriterV2(ManifestWriter):
}
def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
- if entry.data_sequence_number is None:
+ if entry.sequence_number is None:
if entry.snapshot_id is not None and entry.snapshot_id !=
self._snapshot_id:
raise ValueError(f"Found unassigned sequence number for an
entry from snapshot: {entry.snapshot_id}")
if entry.status != ManifestEntryStatus.ADDED:
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 39bcfc2e..4080f3a0 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -1888,7 +1888,7 @@ def _open_manifest(
]
-def _min_data_file_sequence_number(manifests: List[ManifestFile]) -> int:
+def _min_sequence_number(manifests: List[ManifestFile]) -> int:
try:
return min(
manifest.min_sequence_number or INITIAL_SEQUENCE_NUMBER
@@ -1949,11 +1949,11 @@ class DataScan(TableScan):
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema,
partition_expr, self.case_sensitive)(data_file.partition)
- def _check_sequence_number(self, min_data_sequence_number: int, manifest:
ManifestFile) -> bool:
+ def _check_sequence_number(self, min_sequence_number: int, manifest:
ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are
older than the data.
Args:
- min_data_sequence_number (int): The minimal sequence number.
+ min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or
deletes.
Returns:
@@ -1962,7 +1962,7 @@ class DataScan(TableScan):
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
- and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >=
min_data_sequence_number
+ and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >=
min_sequence_number
)
def plan_files(self) -> Iterable[FileScanTask]:
@@ -1994,10 +1994,10 @@ class DataScan(TableScan):
self.table_metadata.schema(), self.row_filter,
self.case_sensitive, self.options.get("include_empty_files") == "true"
).eval
- min_data_sequence_number = _min_data_file_sequence_number(manifests)
+ min_sequence_number = _min_sequence_number(manifests)
data_entries: List[ManifestEntry] = []
- positional_delete_entries = SortedList(key=lambda entry:
entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)
+ positional_delete_entries = SortedList(key=lambda entry:
entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
@@ -2011,7 +2011,7 @@ class DataScan(TableScan):
metrics_evaluator,
)
for manifest in manifests
- if self._check_sequence_number(min_data_sequence_number,
manifest)
+ if self._check_sequence_number(min_sequence_number,
manifest)
],
)
):
@@ -3150,7 +3150,7 @@ class _SnapshotProducer(UpdateTableMetadata[U],
Generic[U]):
ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
- data_sequence_number=None,
+ sequence_number=None,
file_sequence_number=None,
data_file=data_file,
)
@@ -3353,7 +3353,7 @@ class DeleteFiles(_SnapshotProducer["DeleteFiles"]):
return ManifestEntry(
status=status,
snapshot_id=entry.snapshot_id,
- data_sequence_number=entry.data_sequence_number,
+ sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
@@ -3537,7 +3537,7 @@ class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]):
ManifestEntry(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
-
data_sequence_number=entry.data_sequence_number,
+ sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
@@ -3568,7 +3568,7 @@ class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]):
ManifestEntry(
status=ManifestEntryStatus.DELETED,
snapshot_id=entry.snapshot_id,
- data_sequence_number=entry.data_sequence_number,
+ sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
@@ -4016,7 +4016,7 @@ class InspectTable:
entries.append({
"status": entry.status.value,
"snapshot_id": entry.snapshot_id,
- "sequence_number": entry.data_sequence_number,
+ "sequence_number": entry.sequence_number,
"file_sequence_number": entry.file_sequence_number,
"data_file": {
"content": entry.data_file.content,
diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py
index 4df13230..981aab25 100644
--- a/tests/avro/test_file.py
+++ b/tests/avro/test_file.py
@@ -140,7 +140,7 @@ def
test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None:
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
- data_sequence_number=0,
+ sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
@@ -173,7 +173,7 @@ def
test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None:
v2_entry = todict(entry)
# These are not written in V1
- del v2_entry["data_sequence_number"]
+ del v2_entry["sequence_number"]
del v2_entry["file_sequence_number"]
del v2_entry["data_file"]["content"]
del v2_entry["data_file"]["equality_ids"]
@@ -206,7 +206,7 @@ def
test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None:
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
- data_sequence_number=0,
+ sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
@@ -263,7 +263,7 @@ def
test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
- data_sequence_number=0,
+ sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
@@ -305,7 +305,7 @@ def
test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
# Not part of v1
- data_sequence_number=None,
+ sequence_number=None,
file_sequence_number=None,
data_file=v1_datafile,
)
diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py
index ecb99e28..ef33b16b 100644
--- a/tests/utils/test_manifest.py
+++ b/tests/utils/test_manifest.py
@@ -66,7 +66,7 @@ def test_read_manifest_entry(generated_manifest_entry_file:
str) -> None:
assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
- assert manifest_entry.data_sequence_number == 0
+ assert manifest_entry.sequence_number == 0
assert isinstance(manifest_entry.data_file, DataFile)
data_file = manifest_entry.data_file
@@ -250,7 +250,7 @@ def test_read_manifest_v1(generated_manifest_file_file_v1:
str) -> None:
entry = entries[0]
- assert entry.data_sequence_number == 0
+ assert entry.sequence_number == 0
assert entry.file_sequence_number == 0
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
@@ -300,7 +300,7 @@ def test_read_manifest_v2(generated_manifest_file_file_v2:
str) -> None:
entry = entries[0]
- assert entry.data_sequence_number == 3
+ assert entry.sequence_number == 3
assert entry.file_sequence_number == 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
@@ -379,7 +379,7 @@ def test_write_manifest(
assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
- assert manifest_entry.data_sequence_number == -1 if format_version ==
1 else 3
+ assert manifest_entry.sequence_number == -1 if format_version == 1
else 3
assert isinstance(manifest_entry.data_file, DataFile)
data_file = manifest_entry.data_file
@@ -556,7 +556,7 @@ def test_write_manifest_list(
entry = entries[0]
- assert entry.data_sequence_number == 0 if format_version == 1 else 3
+ assert entry.sequence_number == 0 if format_version == 1 else 3
assert entry.file_sequence_number == 0 if format_version == 1 else 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED