sungwy commented on code in PR #2176:
URL: https://github.com/apache/iceberg-python/pull/2176#discussion_r2214707385


##########
pyiceberg/table/update/validate.py:
##########
@@ -14,19 +14,267 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from bisect import bisect_left
 from typing import Iterator, Optional, Set
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    DataFileContent,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
+from pyiceberg.partitioning import PartitionMap, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
+
+
+class _PositionDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, entry: ManifestEntry) -> None:
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+class _EqualityDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None:
+        # TODO: Equality deletes should consider the spec to get the equality 
fields
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+def _find_start_index(seqs: list[int], seq: int) -> int:
+    pos: int = bisect_left(seqs, seq)
+    if pos != len(seqs) and seqs[pos] == seqs:

Review Comment:
   This looks like a bug
   ```suggestion
       if pos != len(seqs) and seqs[pos] == seq:
   ```



##########
pyiceberg/table/update/validate.py:
##########
@@ -14,19 +14,267 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from bisect import bisect_left
 from typing import Iterator, Optional, Set
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    DataFileContent,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
+from pyiceberg.partitioning import PartitionMap, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
+
+
+class _PositionDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, entry: ManifestEntry) -> None:
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+class _EqualityDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None:
+        # TODO: Equality deletes should consider the spec to get the equality 
fields
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:

Review Comment:
   nit: I noticed that we are handling a list of `ManifestEntry` instead of 
handling `DeleteFile` (or `DataFile`) like we do in the java implementation or 
in the `DeleteFileIndex` class defined below. Can we rename this function to 
reflect what it is doing now which is to collect a list of ManifestEntrys with 
delete files?



##########
pyiceberg/table/update/validate.py:
##########
@@ -14,19 +14,267 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from bisect import bisect_left
 from typing import Iterator, Optional, Set
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    DataFileContent,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
+from pyiceberg.partitioning import PartitionMap, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
+
+
+class _PositionDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, entry: ManifestEntry) -> None:
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+class _EqualityDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None:
+        # TODO: Equality deletes should consider the spec to get the equality 
fields
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:

Review Comment:
   This looks like the logic for filtering positionDeletes. Should we implement 
an equivalent of 
[canContainEqDeletesForFile](https://github.com/apache/iceberg/blob/9922e6dc4c76c8c3d4ec396b3932b57f48a1d468/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L216)
 to ensure that we are filtering this correctly?



##########
pyiceberg/table/update/validate.py:
##########
@@ -14,19 +14,267 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from bisect import bisect_left
 from typing import Iterator, Optional, Set
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    DataFileContent,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
+from pyiceberg.partitioning import PartitionMap, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
+
+
+class _PositionDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, entry: ManifestEntry) -> None:
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+class _EqualityDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None:
+        # TODO: Equality deletes should consider the spec to get the equality 
fields
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+def _find_start_index(seqs: list[int], seq: int) -> int:
+    pos: int = bisect_left(seqs, seq)
+    if pos != len(seqs) and seqs[pos] == seqs:
+        return pos
+    return -1
+
+
+def _get_sequence_number_or_raise(entry: ManifestEntry) -> int:
+    if seq := entry.sequence_number:
+        return seq
+    raise ValidationException("ManifestEntry does not have a sequence number")
+
+
+class DeleteFileIndex:
+    """
+    An index of delete files by sequence number.
+
+    Use forDataFile(int, DataFile) or forEntry(ManifestEntry) to get the 
delete files to apply to a given data file
+    """
+
+    _global_deletes: _EqualityDeletes
+    _pos_deletes_by_path: dict[str, _PositionDeletes]
+    _pos_deletes_by_partition: PartitionMap[_PositionDeletes]
+    _eq_deletes_by_partition: PartitionMap[_EqualityDeletes]
+    _has_eq_deletes: bool
+    _has_pos_deletes: bool
+    _is_empty: bool
+
+    def __init__(
+        self,
+        delete_entries: list[ManifestEntry],

Review Comment:
   I think it would be helpful to have a more descriptive docstring explaining 
the inputs and outputs.
   
   For example, I've noticed a discrepancy in the inputs for the 
DeleteFileIndex in PyIceberg compared to Java. Here, we are using a list of 
Manifest entries instead of a list of DeleteFiles. Hence, I think it would be 
important that we filter the entries to ensure that we are only looking at 
entries that have the file content type of a DELETE_FILE and ignore the 
DATA_FILE. If we are assuming that the list of ManifestEntry is already 
filtered when it is provided as an input into this init function, I think it 
would be important to make note of that in the docstring.



##########
pyiceberg/table/update/validate.py:
##########
@@ -14,19 +14,267 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from bisect import bisect_left
 from typing import Iterator, Optional, Set
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    DataFileContent,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
+from pyiceberg.partitioning import PartitionMap, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
+
+
+class _PositionDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, entry: ManifestEntry) -> None:
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+class _EqualityDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None:
+        # TODO: Equality deletes should consider the spec to get the equality 
fields
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+def _find_start_index(seqs: list[int], seq: int) -> int:
+    pos: int = bisect_left(seqs, seq)
+    if pos != len(seqs) and seqs[pos] == seqs:
+        return pos
+    return -1
+
+
+def _get_sequence_number_or_raise(entry: ManifestEntry) -> int:
+    if seq := entry.sequence_number:
+        return seq
+    raise ValidationException("ManifestEntry does not have a sequence number")
+
+
+class DeleteFileIndex:
+    """
+    An index of delete files by sequence number.
+
+    Use forDataFile(int, DataFile) or forEntry(ManifestEntry) to get the 
delete files to apply to a given data file
+    """
+
+    _global_deletes: _EqualityDeletes
+    _pos_deletes_by_path: dict[str, _PositionDeletes]
+    _pos_deletes_by_partition: PartitionMap[_PositionDeletes]
+    _eq_deletes_by_partition: PartitionMap[_EqualityDeletes]
+    _has_eq_deletes: bool
+    _has_pos_deletes: bool
+    _is_empty: bool
+
+    def __init__(
+        self,
+        delete_entries: list[ManifestEntry],
+        spec_by_id: dict[int, PartitionSpec],
+        min_sequence_number: int = INITIAL_SEQUENCE_NUMBER,
+    ) -> None:
+        global_deletes: _EqualityDeletes = _EqualityDeletes()
+        eq_deletes_by_partition: PartitionMap[_EqualityDeletes] = 
PartitionMap(spec_by_id)
+        pos_deletes_by_partition: PartitionMap[_PositionDeletes] = 
PartitionMap(spec_by_id)
+        pos_deletes_by_path: dict[str, _PositionDeletes] = {}
+
+        for entry in delete_entries:
+            if entry.sequence_number is None:
+                continue
+
+            if entry.sequence_number <= min_sequence_number:
+                continue
+
+            file: DataFile = entry.data_file
+            content: DataFileContent = file.content
+
+            if content == DataFileContent.POSITION_DELETES:
+                self._add_pos_deletes(pos_deletes_by_path, 
pos_deletes_by_partition, entry)
+            elif content == DataFileContent.EQUALITY_DELETES:
+                self._add_eq_deletes(global_deletes, eq_deletes_by_partition, 
spec_by_id, entry)
+            else:
+                raise NotImplementedError(f"Unsupported content: 
{file.content}")
+
+        # Set global variables for the class
+        self._spec_by_id = spec_by_id
+        self._global_deletes = global_deletes
+        self._eq_deletes_by_partition = eq_deletes_by_partition
+        self._pos_deletes_by_partition = pos_deletes_by_partition
+        self._pos_deletes_by_path = pos_deletes_by_path
+
+        self._has_eq_deletes = global_deletes.is_empty() or 
len(eq_deletes_by_partition) > 0
+        self._has_pos_deletes = len(pos_deletes_by_partition) > 0 or 
len(pos_deletes_by_path) > 0
+        self._is_empty = not self._has_eq_deletes and not self._has_pos_deletes
+
+    def _add_pos_deletes(
+        self,
+        pos_deletes_by_path: dict[str, _PositionDeletes],
+        pos_deletes_by_partition: PartitionMap[_PositionDeletes],
+        entry: ManifestEntry,
+    ) -> None:
+        deletes: _PositionDeletes = _PositionDeletes()
+
+        # TODO: Fallback method to get file_path from lower_bounds
+        if file_path := entry.data_file.file_path:
+            if file_path not in pos_deletes_by_path:
+                pos_deletes_by_path[file_path] = deletes
+            else:
+                deletes = pos_deletes_by_path[file_path]
+        else:
+            spec_id: int = entry.data_file.spec_id
+            partition: Record = entry.data_file.partition
+            pos_deletes_by_partition.compute_if_absent(spec_id, partition, 
lambda: deletes)
+
+        deletes.add_entry(entry)
+
+    def _add_eq_deletes(
+        self,
+        global_deletes: _EqualityDeletes,
+        eq_deletes_by_partition: PartitionMap[_EqualityDeletes],
+        spec_by_id: dict[int, PartitionSpec],
+        entry: ManifestEntry,
+    ) -> None:
+        deletes: _EqualityDeletes = _EqualityDeletes()
+
+        if spec := spec_by_id.get(entry.data_file.spec_id):
+            if spec.is_unpartitioned():
+                deletes = global_deletes
+            else:
+                spec_id = spec.spec_id
+                partition = entry.data_file.partition
+                eq_deletes_by_partition.compute_if_absent(spec_id, partition, 
lambda: deletes)
+
+            deletes.add_entry(spec, entry)
+
+    def is_empty(self) -> bool:
+        return self._is_empty
+
+    def has_equality_deletes(self) -> bool:
+        return self._has_eq_deletes
+
+    def has_position_deletes(self) -> bool:
+        return self._has_pos_deletes
+
+    def for_entry(self, entry: ManifestEntry) -> list[ManifestEntry]:
+        sequence_number = _get_sequence_number_or_raise(entry)
+        return self.for_data_file(sequence_number, entry)
+
+    def for_data_file(self, sequence_number: int, entry: ManifestEntry) -> 
list[ManifestEntry]:
+        if self.is_empty():
+            return []
+
+        global_deletes = self._global_deletes.filter(sequence_number, entry)
+        pos_path_deletes = 
self._pos_deletes_by_path[entry.data_file.file_path].filter(sequence_number)
+
+        spec_id = entry.data_file.spec_id
+        partition = entry.data_file.partition
+
+        eq_deletes_by_partition: list[ManifestEntry] = []
+        if eq_deletes := self._eq_deletes_by_partition.get(spec_id, partition):
+            eq_deletes_by_partition = eq_deletes.filter(sequence_number, entry)
+
+        pos_deletes_by_partition: list[ManifestEntry] = []
+        if pos_deletes := self._pos_deletes_by_partition.get(spec_id, 
partition):
+            pos_deletes_by_partition = pos_deletes.filter(sequence_number)
+
+        return global_deletes + eq_deletes_by_partition + pos_path_deletes + 
pos_deletes_by_partition
+
+    def referenced_delete_files(self) -> list[DataFile]:
+        entries: list[ManifestEntry] = []
+

Review Comment:
   Do we need to check for `self._global_deletes` and 
`self._eq_deletes_by_partition`?



##########
pyiceberg/table/update/validate.py:
##########
@@ -14,19 +14,267 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from bisect import bisect_left
 from typing import Iterator, Optional, Set
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    DataFileContent,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
+from pyiceberg.partitioning import PartitionMap, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
+
+
+class _PositionDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, entry: ManifestEntry) -> None:
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+class _EqualityDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None:
+        # TODO: Equality deletes should consider the spec to get the equality 
fields
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+def _find_start_index(seqs: list[int], seq: int) -> int:
+    pos: int = bisect_left(seqs, seq)
+    if pos != len(seqs) and seqs[pos] == seqs:
+        return pos
+    return -1
+
+
+def _get_sequence_number_or_raise(entry: ManifestEntry) -> int:
+    if seq := entry.sequence_number:
+        return seq
+    raise ValidationException("ManifestEntry does not have a sequence number")
+
+
+class DeleteFileIndex:
+    """
+    An index of delete files by sequence number.
+
+    Use forDataFile(int, DataFile) or forEntry(ManifestEntry) to get the 
delete files to apply to a given data file
+    """
+
+    _global_deletes: _EqualityDeletes
+    _pos_deletes_by_path: dict[str, _PositionDeletes]
+    _pos_deletes_by_partition: PartitionMap[_PositionDeletes]
+    _eq_deletes_by_partition: PartitionMap[_EqualityDeletes]
+    _has_eq_deletes: bool
+    _has_pos_deletes: bool
+    _is_empty: bool
+
+    def __init__(
+        self,
+        delete_entries: list[ManifestEntry],
+        spec_by_id: dict[int, PartitionSpec],
+        min_sequence_number: int = INITIAL_SEQUENCE_NUMBER,
+    ) -> None:
+        global_deletes: _EqualityDeletes = _EqualityDeletes()
+        eq_deletes_by_partition: PartitionMap[_EqualityDeletes] = 
PartitionMap(spec_by_id)
+        pos_deletes_by_partition: PartitionMap[_PositionDeletes] = 
PartitionMap(spec_by_id)
+        pos_deletes_by_path: dict[str, _PositionDeletes] = {}
+
+        for entry in delete_entries:
+            if entry.sequence_number is None:
+                continue
+
+            if entry.sequence_number <= min_sequence_number:
+                continue
+
+            file: DataFile = entry.data_file
+            content: DataFileContent = file.content
+

Review Comment:
   We now have support for Deletion Vectors in PyIceberg. Could we add support 
for adding deletes from deletion vectors similar to the [java reference 
implementation](https://github.com/apache/iceberg/blob/9922e6dc4c76c8c3d4ec396b3932b57f48a1d468/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L480-L481C19)?
 Including this will be important for accuracy.
   
   Here's an example of how a function in PyIceberg detects that a Delete File 
is stored in the form of a deletion vector.
   
https://github.com/apache/iceberg-python/blob/ad8263b1be048c8cb67d40efe70f494a4f1cb374/pyiceberg/io/pyarrow.py#L981-L999



##########
pyiceberg/table/update/validate.py:
##########


Review Comment:
   Looking at the java reference implementation, I noticed that the 
`DeleteFileIndex` is also used in 
[BaseDistributedDataScan](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java)
 as an output to 
[planDeletesRemotely](https://github.com/apache/iceberg/blob/9922e6dc4c76c8c3d4ec396b3932b57f48a1d468/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java#L143).
 Would it make sense to put all these new classes and some of the functions 
related to building the DeleteFileIndex into a separate module? 
`pyiceberg.table.update.delete.py` maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to