gabeiglio commented on code in PR #2176:
URL: https://github.com/apache/iceberg-python/pull/2176#discussion_r2214932195


##########
pyiceberg/table/update/validate.py:
##########
@@ -14,19 +14,267 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from bisect import bisect_left
 from typing import Iterator, Optional, Set
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    DataFileContent,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
+from pyiceberg.partitioning import PartitionMap, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
+
+
+class _PositionDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, entry: ManifestEntry) -> None:
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+class _EqualityDeletes:
+    # Indexed state
+    _seqs: list[int] = []
+    _entries: list[ManifestEntry] = []
+
+    # Buffer used to hold files before indexing
+    _buffer: list[ManifestEntry] = []
+    _indexed: bool = False
+
+    def _index_if_needed(self) -> None:
+        if self._indexed is False:
+            self._entries = sorted(self._buffer, key=lambda entry: 
_get_sequence_number_or_raise(entry))
+            self._seqs = [_get_sequence_number_or_raise(entry) for entry in 
self._entries]
+            self._indexed = True
+
+    def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None:
+        # TODO: Equality deletes should consider the spec to get the equality 
fields
+        if self._indexed:
+            raise Exception("Can't add files upon indexing.")
+        self._buffer.append(entry)
+
+    def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]:
+        self._index_if_needed()
+        start = _find_start_index(self._seqs, seq)
+
+        if start >= len(self._entries):
+            return []
+
+        if start == 0:
+            return self.referenced_delete_files()
+
+        matching_entries_count: int = len(self._entries) - start
+        return self._entries[matching_entries_count:]
+
+    def referenced_delete_files(self) -> list[ManifestEntry]:
+        self._index_if_needed()
+        return self._entries
+
+    def is_empty(self) -> bool:
+        self._index_if_needed()
+        return len(self._entries) > 0
+
+
+def _find_start_index(seqs: list[int], seq: int) -> int:
+    pos: int = bisect_left(seqs, seq)
+    if pos != len(seqs) and seqs[pos] == seqs:
+        return pos
+    return -1
+
+
+def _get_sequence_number_or_raise(entry: ManifestEntry) -> int:
+    if seq := entry.sequence_number:
+        return seq
+    raise ValidationException("ManifestEntry does not have a sequence number")
+
+
+class DeleteFileIndex:
+    """
+    An index of delete files by sequence number.
+
+    Use forDataFile(int, DataFile) or forEntry(ManifestEntry) to get the 
delete files to apply to a given data file
+    """
+
+    _global_deletes: _EqualityDeletes
+    _pos_deletes_by_path: dict[str, _PositionDeletes]
+    _pos_deletes_by_partition: PartitionMap[_PositionDeletes]
+    _eq_deletes_by_partition: PartitionMap[_EqualityDeletes]
+    _has_eq_deletes: bool
+    _has_pos_deletes: bool
+    _is_empty: bool
+
+    def __init__(
+        self,
+        delete_entries: list[ManifestEntry],

Review Comment:
   Yes the reason for the discrepancy is that I did not want to make the PR 
much bigger (it is already big) so I tried to trim down some functionality that 
were not immediately needed by the validation. It would be free to add an extra 
check for DELETE_FILE inside the DeleteFileIndex so might as well add it. And I 
hear your feedback on the docstrings ill add them also. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to