Fokko commented on code in PR #6775:
URL: https://github.com/apache/iceberg/pull/6775#discussion_r1199829211


##########
python/pyiceberg/table/__init__.py:
##########
@@ -401,9 +423,38 @@ def plan_files(self) -> Iterator[FileScanTask]:
                             metrics_evaluator,
                         )
                         for manifest in manifests
+                        if (manifest.content is None or manifest.content == 
ManifestContent.DATA)

Review Comment:
   I don't like this, I'm going to separate this out in a function.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -498,6 +504,49 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+@lru_cache
+def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> 
ds.FileFormat:
+    if file_format == FileFormat.PARQUET:
+        return ds.ParquetFileFormat(**kwargs)
+    else:
+        raise ValueError(f"Unsupported file format: {file_format}")
+
+
+def _construct_fragment(fs: FileSystem, data_file: DataFile, 
file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
+    _, path = PyArrowFileIO.parse_location(data_file.file_path)
+    return _get_file_format(data_file.file_format, 
**file_format_kwargs).make_fragment(path, fs)
+
+
+def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, 
pa.ChunkedArray]:
+    delete_fragment = _construct_fragment(
+        fs, data_file, file_format_kwargs={"dictionary_columns": 
("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}
+    )
+    table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
+    table = table.unify_dictionaries()
+    return {
+        file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
+        for file in table.column("file_path").chunks[0].dictionary
+    }
+
+
+def _create_positional_deletes_indices(positional_deletes: 
List[pa.ChunkedArray], fn_rows: Callable[[], int]) -> pa.Array:
+    sorted_deleted = merge(*positional_deletes)
+
+    def generator() -> Generator[int, None, None]:
+        deleted_pos = next(sorted_deleted).as_py()  # type: ignore
+        for pos in range(fn_rows()):
+            if deleted_pos == pos:
+                try:
+                    deleted_pos = next(sorted_deleted).as_py()  # type: ignore

Review Comment:
   Oof, nice catch!
   
   ```python
   >>> sorted_deleted = iter([1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 4, 5, 6])
   >>> deleted_pos = next(sorted_deleted)
   >>> for pos in range(10):
   ...     if deleted_pos == pos:
   ...         while deleted_pos == pos:
   ...             try:
   ...                 deleted_pos = next(sorted_deleted)
   ...             except StopIteration:
   ...                 deleted_pos = -1
   ...     else:
   ...         print(f"yield {pos}")
   ...         
   ... 
   yield 0
   yield 3
   yield 7
   yield 8
   yield 9
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -301,35 +304,44 @@ class ScanTask(ABC):
 @dataclass(init=False)
 class FileScanTask(ScanTask):
     file: DataFile
+    delete_files: Set[DataFile]
     start: int
     length: int
 
-    def __init__(self, data_file: DataFile, start: Optional[int] = None, 
length: Optional[int] = None):
+    def __init__(
+        self,
+        data_file: DataFile,
+        delete_files: Optional[Set[DataFile]] = None,
+        start: Optional[int] = None,
+        length: Optional[int] = None,
+    ):
         self.file = data_file
+        self.delete_files = delete_files or set()
         self.start = start or 0
         self.length = length or data_file.file_size_in_bytes
 
 
-def _check_content(file: DataFile) -> DataFile:
-    try:
-        if file.content == ManifestContent.DELETES:
-            raise ValueError("PyIceberg does not support deletes: 
https://github.com/apache/iceberg/issues/6568";)
-        return file
-    except AttributeError:
-        # If the attribute is not there, it is a V1 record
-        return file
-
-
 def _open_manifest(
     io: FileIO,
     manifest: ManifestFile,
     partition_filter: Callable[[DataFile], bool],
     metrics_evaluator: Callable[[DataFile], bool],
-) -> List[FileScanTask]:
-    all_files = files(io.new_input(manifest.manifest_path))
-    matching_partition_files = filter(partition_filter, all_files)
-    matching_partition_data_files = map(_check_content, 
matching_partition_files)
-    return [FileScanTask(file) for file in matching_partition_data_files if 
metrics_evaluator(file)]
+) -> List[DataFile]:
+    result_manifests = files(io.new_input(manifest.manifest_path))
+    result_manifests = filter(partition_filter, result_manifests)
+    return [file for file in result_manifests if metrics_evaluator(file)]
+
+
+def _min_sequence_number(manifests: List[ManifestFile]) -> int:

Review Comment:
   I like it, thanks!



##########
python/pyiceberg/table/__init__.py:
##########
@@ -301,35 +304,44 @@ class ScanTask(ABC):
 @dataclass(init=False)
 class FileScanTask(ScanTask):
     file: DataFile
+    delete_files: Set[DataFile]
     start: int
     length: int
 
-    def __init__(self, data_file: DataFile, start: Optional[int] = None, 
length: Optional[int] = None):
+    def __init__(
+        self,
+        data_file: DataFile,
+        delete_files: Optional[Set[DataFile]] = None,
+        start: Optional[int] = None,
+        length: Optional[int] = None,
+    ):
         self.file = data_file
+        self.delete_files = delete_files or set()
         self.start = start or 0
         self.length = length or data_file.file_size_in_bytes
 
 
-def _check_content(file: DataFile) -> DataFile:
-    try:
-        if file.content == ManifestContent.DELETES:
-            raise ValueError("PyIceberg does not support deletes: 
https://github.com/apache/iceberg/issues/6568";)
-        return file
-    except AttributeError:
-        # If the attribute is not there, it is a V1 record
-        return file
-
-
 def _open_manifest(
     io: FileIO,
     manifest: ManifestFile,
     partition_filter: Callable[[DataFile], bool],
     metrics_evaluator: Callable[[DataFile], bool],
-) -> List[FileScanTask]:
-    all_files = files(io.new_input(manifest.manifest_path))
-    matching_partition_files = filter(partition_filter, all_files)
-    matching_partition_data_files = map(_check_content, 
matching_partition_files)
-    return [FileScanTask(file) for file in matching_partition_data_files if 
metrics_evaluator(file)]
+) -> List[DataFile]:
+    result_manifests = files(io.new_input(manifest.manifest_path))
+    result_manifests = filter(partition_filter, result_manifests)

Review Comment:
   I agree with you. I'm not sure why I changed this in the first place.



##########
python/pyiceberg/table/__init__.py:
##########
@@ -401,9 +423,38 @@ def plan_files(self) -> Iterator[FileScanTask]:
                             metrics_evaluator,
                         )
                         for manifest in manifests
+                        if (manifest.content is None or manifest.content == 
ManifestContent.DATA)
+                        or (
+                            # Not interested in deletes that are older than 
the data
+                            manifest.content == ManifestContent.DELETES
+                            and (manifest.sequence_number or 
INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
+                        )
                     ],
                 )
+            ):
+                if datafile.content is None or datafile.content == 
DataFileContent.DATA:
+                    data_datafiles.append(datafile)
+                elif datafile.content == DataFileContent.POSITION_DELETES:
+                    deletes_positional.append(datafile)
+                elif datafile.content == DataFileContent.EQUALITY_DELETES:

Review Comment:
   I left it out for now since it is already quite a hefty PR



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -721,18 +773,38 @@ def _file_to_table(
         fragment_scanner = ds.Scanner.from_fragment(
             fragment=fragment,
             schema=physical_schema,
-            filter=pyarrow_filter,
+            # This will push down the query to Arrow.
+            # But in case there are positional deletes, we have to apply them 
first
+            filter=pyarrow_filter if not positional_deletes else None,
             columns=[col.name for col in file_project_schema.columns],
         )
 
+        if positional_deletes:
+            # In the case of a mask, it is a bit awkward because we first

Review Comment:
   > If I understand correctly, the problem is that we are relying on the arrow 
result to correspond 1-to-1 with the records in the file so that we can use 
position in the DataFrame as the row position in the file.
   
   Yes, this is correct.
   
   > But if we need to read deletes, we don't want to read the entire file, 
which could mean reading whole row groups that are unnecessary.
   
   Based on the row group statistics, yes.
   
   > I don't know if Arrow supports this, but it would need to.
   
   I don't think Arrow supports this today. I think we can even implement this 
on the PyIceberg side, but I don't think we should. Because:
   
   - It is internal to PyArrow
   - This would pull a lot of hot code into Python, where the GIL will slow us 
down.
   
   The points that you address above are correct. At the Arrow side we're 
looking into implementing this:  https://github.com/apache/arrow/issues/35301
   
   The last comment was about adding an internal index column that can be used 
for this purpose. This way we can combine the filters, and push this down to 
PyArrow (and also simplify things at the PyIceberg end, I feel like all the 
if-else branches make the code error prone).



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -786,15 +865,39 @@ def project_table(
     rows_counter = multiprocessing.Value("i", 0)
 
     with ThreadPool() as pool:
+        # Fetch the deletes
+        deletes_per_file: Dict[str, List[ChunkedArray]] = {}

Review Comment:
   Good one, I like that a lot!



##########
python/pyiceberg/table/__init__.py:
##########
@@ -259,7 +262,7 @@ def projection(self) -> Schema:
         return snapshot_schema.select(*self.selected_fields, 
case_sensitive=self.case_sensitive)
 
     @abstractmethod
-    def plan_files(self) -> Iterator[ScanTask]:
+    def plan_files(self) -> Iterable[ScanTask]:

Review Comment:
   It won't break the code of anyone using this, but it might alarm the type 
checker.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -28,12 +28,16 @@
 import os
 from abc import ABC, abstractmethod
 from functools import lru_cache, singledispatch
+from heapq import merge

Review Comment:
   Thanks! Are there any specific changes that you would like to see in a 
separate PR? The `heapq` is used for merging the different positional deletes.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -498,6 +504,49 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+@lru_cache
+def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> 
ds.FileFormat:
+    if file_format == FileFormat.PARQUET:
+        return ds.ParquetFileFormat(**kwargs)
+    else:
+        raise ValueError(f"Unsupported file format: {file_format}")
+
+
+def _construct_fragment(fs: FileSystem, data_file: DataFile, 
file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
+    _, path = PyArrowFileIO.parse_location(data_file.file_path)
+    return _get_file_format(data_file.file_format, 
**file_format_kwargs).make_fragment(path, fs)
+
+
+def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, 
pa.ChunkedArray]:
+    delete_fragment = _construct_fragment(
+        fs, data_file, file_format_kwargs={"dictionary_columns": 
("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}
+    )
+    table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
+    table.unify_dictionaries()
+    return {
+        file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
+        for file in table.column("file_path").chunks[0].dictionary
+    }
+
+
+def _create_positional_deletes_indices(positional_deletes: 
List[pa.ChunkedArray], fn_rows: Callable[[], int]) -> pa.Array:
+    sorted_deleted = merge(*positional_deletes)
+
+    def generator() -> Generator[int, None, None]:
+        deleted_pos = next(sorted_deleted).as_py()  # type: ignore
+        for pos in range(fn_rows()):
+            if deleted_pos == pos:
+                try:
+                    deleted_pos = next(sorted_deleted).as_py()  # type: ignore
+                except StopIteration:
+                    deleted_pos = -1
+            else:
+                yield pos
+
+    # Filter on the positions
+    return pa.array(generator(), type=pa.int64())

Review Comment:
   @jorisvandenbossche  Let me know if you're interested in providing this from 
the PyArrow side :) Would be very welcome.



##########
python/pyiceberg/table/__init__.py:
##########
@@ -401,9 +423,38 @@ def plan_files(self) -> Iterator[FileScanTask]:
                             metrics_evaluator,
                         )
                         for manifest in manifests
+                        if (manifest.content is None or manifest.content == 
ManifestContent.DATA)
+                        or (
+                            # Not interested in deletes that are older than 
the data
+                            manifest.content == ManifestContent.DELETES
+                            and (manifest.sequence_number or 
INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
+                        )
                     ],
                 )
+            ):
+                if datafile.content is None or datafile.content == 
DataFileContent.DATA:
+                    data_datafiles.append(datafile)
+                elif datafile.content == DataFileContent.POSITION_DELETES:
+                    deletes_positional.append(datafile)
+                elif datafile.content == DataFileContent.EQUALITY_DELETES:
+                    raise ValueError(
+                        "PyIceberg does not yet support equality deletes: 
https://github.com/apache/iceberg/issues/6568";
+                    )
+                else:
+                    raise ValueError(f"Unknown DataFileContent: 
{datafile.content}")
+
+        return [
+            FileScanTask(data_file, 
delete_files=self._match_deletes_to_datafile(data_file, deletes_positional))
+            for data_file in data_datafiles
+        ]
+
+    def _match_deletes_to_datafile(self, data_file: DataFile, 
positional_delete_files: List[DataFile]) -> Set[DataFile]:
+        return set(

Review Comment:
   Ahh, I see. Yes. I agree. I also recall this from the Java implementation, 
but I was confused with the sequence numbers on the manifest-list level.
   
   I've added a sorted-list for collecting the deletes by sequence number, then 
we can efficiently bisect them by only selecting the deletes that came after 
the data file.



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1186,6 +1194,63 @@ def 
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
     assert "Could not find field with name unknown_field, case_sensitive=True" 
in str(exc_info.value)
 
 
[email protected]
+def deletes_file(tmp_path: str, example_task: FileScanTask) -> str:
+    path = example_task.file.file_path
+    table = pa.table({"file_path": [path, path, path], "pos": [1, 3, 5]})
+
+    deletes_file_path = f"{tmp_path}/deletes.parquet"
+    pq.write_table(table, deletes_file_path)
+
+    return deletes_file_path
+
+
+def test_read_deletes(deletes_file: str, example_task: FileScanTask) -> None:
+    deletes = _read_deletes(LocalFileSystem(), 
DataFile(file_path=deletes_file, file_format=FileFormat.PARQUET))
+    assert set(deletes.keys()) == {example_task.file.file_path}
+    assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]])
+
+
+def test_delete(deletes_file: str, example_task: FileScanTask, 
table_schema_simple: Schema) -> None:

Review Comment:
   I was waiting for the integration tests to be added, I've added some more 
tests :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to